ENH: create out-of-core processing module #3202

Closed
jreback opened this Issue Mar 28, 2013 · 32 comments

Projects

None yet

8 participants

@jreback
Collaborator
jreback commented Mar 28, 2013

Conceptually create a pipeline processor that performs out-of-core computation.
This is easily parallelizable (multi-core or machines), in theory cython / ipython / joblib / hadoop could operate with this

requirements

the data set must support chunking, and the function must operate only on that chunk
Useful in cases of a large number of rows, or a problem that you want to parrallelize.

input

a chunking iterator that reads from disk (could take chunksize parameters,
a handle and just call the iterators as well)

  • read_csv
  • HDFStore

function

take an iterated chunk, and an axis and return another pandas object
(could be a reduction, transformation, whatever)

output

an output mechanism to take the function application, must support appending

  • to_csv (with appending)
  • HDFStore (table)
  • another pipeline
  • in memory

Map-reduce is an example of this type of pipelining.

Interesting Library
https://github.com/enthought/distarray

@Zelazny7

This is the functionality that pandas currently lacks that is preventing me from ditching SAS. This is a monumental task but so worth the effort! I'm very happy to see it being discussed. Is there any possibility of collaborating with this project? http://continuum.io/blog/blaze

@wesm
Member
wesm commented Mar 29, 2013

The timeline for when something production-ready is going to ship there is unclear but worth keeping an eye on for collaboration possibilities.

@jreback
Collaborator
jreback commented Mar 29, 2013

@Zelazny7 could u put up some example calculations?
obviously doesn't have to be very big and can use random data

@Zelazny7

For example, finding the within group sum for a dataframe could be broken up iteratively like so. This is all done in memory, but the same concept could be applied to a file stored on disk and read-in piece-meal. The purpose of this enhancement, I hope, is to make the management of the IO and chunking transparent to the user:

def gen(df, chunksize=50000):
    nrows = len(df)
    current = 0
    while current < nrows:
        yield df[current:current+chunksize]
        current += chunksize

res = pd.concat([chunk.groupby('YearMade').sum() for chunk in gen(df)])
res.groupby(res.index).sum().head()

          SalePrice
YearMade
1000      781142325
1919        2210750
1920         493250
1937          18000
1942          51000

This is equivalent to:

df.groupby('YearMade').sum().head()
          SalePrice
YearMade
1000      781142325
1919        2210750
1920         493250
1937          18000
1942          51000

I tried using reduce and combine functions to achieve a similar result, but I don't know enough pandas/python to understand why it was failing. Interestingly, a time comparison of the two results shows the method using a generator is slightly faster:

def test():
    res = pd.concat([chunk.groupby('YearMade').sum() for chunk in gen(df)])
    res.groupby(res.index).sum()

In [1]: %timeit test()
10 loops, best of 3: 31.2 ms per loop

In [2]: %timeit df.groupby('YearMade').sum()
10 loops, best of 3: 32.8 ms per loop

There is a broad class of calculations that can be performed on chunks of the data and the results collated together. The sum example shown above is one such case. This can easily be extended to the mean, variance and standard deviation. Median's and quantiles are a bit trickier and certain kinds of regression would have to fall back on gradient descent algorithms rather than close-form solutions.

Edited to add that scikit-learn has several algorithms that implement the partial_fit method. These functions are designed to work on chunks of data like this.

BTW, I pulled the data from Kaggle: http://www.kaggle.com/c/bluebook-for-bulldozers

@jreback
Collaborator
jreback commented Mar 29, 2013

@Zelazny7

I think we could start off with an API something like this (pseudo codeish here):

Create pipes

pipe = pipeline.Pipe(handle_or_iterator_or_pandas_object, 
                                 func, axis, name, chunksize=50k, 
                                  method='single_core', location=dir_to_save_data,
                                  combine=None, partition=None))
  handle_or_iterator_or_object : would be either an opened read_csv/HDFStore iterator
            (or could open/close it for you when done), or a pandas-object (e.g. a Frame)
  func : a function taking a single argument, plus passed kwargs to evaluate
            can return a similarly sized object (e.g. a map), or a reduction
  axis : the slicing axis (of how we partition)
  name : this is necessary to save any intermediate data (in our location), suppose this could be optional
              and just save in temp dirs of the location (which itself could be a temp dir)
  chunksize : the size of a particular chunk
  partition : an optional partitioning function (in lieu of chunksize / axis)
  method : how to execute this evaluation, where by on a chunk-by-chunk basis (single_core),
                 multi-process (multi_core), or distributed (TBD), this possibily could be
                 instead set by the class (e.g. Pipe is the base, SingleCore, MultiCore, Distributed)
  location : if saving data (e.g. in a map/transform case), need to save each chunk
  combine : a combiner function (e.g. trivial in the case of sum, but could be somewhat complicated,
                   maybe do this as a separate pipe)

Eval a pipe (and return a new pipe of this computation)

returned_pipe = pipe.eval(**kwargs)

Get results

df = pipe.get_results()
pipe.write_results(output_handle)

Cleanup (delete temporary files), how to make this cleanup nicely

pipe.clean()

the reason for the eval and get_results would be to allow chaining and potentially lazy evaluation
one could call get_results I think with the same args as eval (which in effect just uses a single pipe)

I think could support methods of single_core, and probably multi_core using the multi-processing module

so I think that we could right your computation like this

pipeline.Pipe(df).get_results(lambda x: x.groupby('YearMade').sum())

if df happens to be on disk

pipeline.Pipe('cool.csv').get_results(lambda x: x.groupby('YearMade').sum())

Alternatively

  1. could have separte methods map (or apply) and reduce for evaluation
  2. could support specific methods e.g. groupby, sum, mean

Could do some of all three

We usually use apply for a map type operation (and reduction for that matter)
eval is currently not really used in pandas (but maybe it makes sense to introduce it
to signal that we are doing something special here

any suggestions for names, pipeline just sort of random....

@jreback
Collaborator
jreback commented Mar 29, 2013

This probably also should be done with a decorator, mainly to facilitate open/close context of the file
I think I have seen syntax similar to this in celery (distributed task queues)

http://docs.celeryproject.org/en/latest/getting-started/introduction.html

@pipeline.Pipeline(chunksize=50k, method='single_core')
 def func(x):
      return x.groupby('YearMade').sum()

Then this will work (I think)

df = func('cool.csv')
@jreback
Collaborator
jreback commented Mar 29, 2013

having read_csv provide start/stop parameters so we can read a specific chunk from disk, see #3221

Pytables evaluator, can provide simple computations thru this directly

numpy w/multi-processing

things to take note:

mclapply in R

Creating shared memory with multiprocessing

mp_arr = multiprocessing.Array(ctypes.c_double, 100)
arr = np.frombuffer(mp_arr.get_obj())

from Here: http://mail.scipy.org/pipermail/scipy-user/2013-April/034415.html

Another good Ref
http://stackoverflow.com/questions/15976937/making-my-numpy-array-shared-across-processes

@jreback
Collaborator
jreback commented Apr 4, 2013

answered in this question:
http://stackoverflow.com/questions/15798209/pandas-group-by-query-on-large-data-in-hdfstore

2 methods for groupby

import numpy as np
import pandas as pd
import os

fname = 'groupby.h5'

# create a frame
df = pd.DataFrame({'A': ['foo', 'foo', 'foo', 'foo',
                         'bar', 'bar', 'bar', 'bar',
                         'foo', 'foo', 'foo'],
                   'B': ['one', 'one', 'one', 'two',
                         'one', 'one', 'one', 'two',
                         'two', 'two', 'one'],
                   'C': ['dull', 'dull', 'shiny', 'dull',
                         'dull', 'shiny', 'shiny', 'dull',
                         'shiny', 'shiny', 'shiny'],
                   'D': np.random.randn(11),
                   'E': np.random.randn(11),
                   'F': np.random.randn(11)})


# create the store and append, using data_columns where I possibily
# could aggregate
df.to_hdf(fname,'df',mode='w',table=True,data_columns=['A','B','C'])

# method 1
with pd.get_store(fname) as store:

    # get the groups
    groups = store.select_column('df','A').unique()

    # iterate over the groups and apply my operations
    l = []
    for g in groups:

        # select the group
        grp = store.select('df',where = [ 'A=%s' % g ])

        # this is a regular frame, aggregate however you would like
        l.append(grp[['D','E','F']].sum())

    print "\nresult (1):\n%s" % pd.concat(l, keys = groups)

# method 2 (only going to work for sum; for mean need to track the running count as well)
with pd.get_store(fname) as store:

    l = []
    for chunk in store.select('df', chunksize=3):
        l.append(chunk.groupby('A').sum())

    print "\nresult (2):\n%s" % pd.concat(l).reset_index().groupby('A').sum().stack()
@Zelazny7
Zelazny7 commented Apr 8, 2013

Functionality like the ff package from R would go a long ways towards making pandas easier to use with large-ish data: http://cran.r-project.org/web/packages/ff/index.html

HDFStore is great, but not being able to transparently select columns and, more importantly, append new columns makes it difficult to use efficiently. ff essentially stores a dataframe on disk. Users can bring portions of the dataframe into memory using the same syntax as an in-memory dataframe. This is why I call it transparent. Furthermore, it is very easy to "write" a new column by using the same syntax to assign a new column to an in-memory dataframe.

This functionality alone - being able to access elements of an on-disk dataframe - would be a huge win for pandas and could be implemented more quickly than the sequential processes discussed above. I think this would be a good stepping stone on the way towards out-of-core computations. I think it's reasonable for users to pull subsets of their data into memory for manipulation. It becomes a pain when we have to manage the i/o operations. This would alleviate that pain-point.

@jreback
Collaborator
jreback commented Apr 8, 2013

you can now select columns easily read_column (in 0.11 - docs are not built yet)

adding columns efficiently is quite difficult, you would need to store data column wise, and then adding rows would be difficult. You have to pick. (that said, we possibily could support a columnar access, via ctable), its not that different an interface, but it suffers from exactly the same problem.

That said you can easily add columns if you disaggregate your table, e.g. store a column per group.

The functionarily your are asking for here is bascially a numpy memmap, This tricky issue is exactly how you would use this, can you give me a pseudo-code example of how you would go about accessing this?

@Zelazny7
Zelazny7 commented Apr 8, 2013

In The predictive modeling world, think FICO and other credit scores, it's pretty much always a big table of heterogenous data. I would love to use pandas like this:

# read in a csv (or other text file) using awesome pandas parser, but store on disk
df_on_disk = pd.read_csv('somefile.csv', dtypes=dtypes, mode='disk')

# select columns into memory
col_in_mem = df_on_disk['col_that_I_need']

# multiple columns
cols_in_mem = df_on_disk[['col1','col2','col8']]

# writing to disk using column in memory
modified_col_in_mem = cool_function(col_in_mem)
df_on_disk['new_col'] = modified_col_in_mem

All of the cool indexing that pandas handles in-memory could be ignored for the on-disk dataframe. As long as it has the same number of rows, a new column would be slapped onto the end.

@Zelazny7
Zelazny7 commented Apr 8, 2013

I'm starting to understand the difficulty of quickly accessing and storing columns like this. To accomplish it using pd.read_csv, one loop for every column would have to be performed. Still, it would only have to be done once. The columns would have to be written to a mmap file in sequential order. Storing meta data about the columns and types would allow the mmap to be quickly accessed and return pandas objects. I think I"ll toy around with creating something like this and see how it goes.

@jreback
Collaborator
jreback commented Apr 8, 2013

If I remember how your original question on SO was, you just need to write the table (as is)) to HDFStore, with whatever data columnsyou want (or multiple tables if its very wide), then you just write columns to another group/groups as needed. There is some bookeeping in order to track what is where, so when you do a query you are pulling in the correct groups (and not selecting on certain ones), but this should be quite fast. And every once in a while you could rebuild the store into a new one (conceputally 'packing' it).

Its kind of like a distributed file system, you write and the system stores it where convenient. It has a map to tell you where everything is (kind of like a table of contents). The user of the system doesn't care exactly where the data is, just that they can do operations on it. Appends of rows are a bit more complicated because you have to split the row and append to multipel groups. Column appends are also easy. Even deletes of columns can be handled (just remember that column is deleted).

You ideally want locality of data when you select, e.g. if you typically want a bunch of columns at the same time, then store them in a table together.

I think could but a wrapper around this and make it pretty transparent. Wrap the selection mechnasim and it you could effectively have an out-of-core dataframe.

@jreback
Collaborator
jreback commented Apr 18, 2013

@Zelazny7 I lost the question where we were talking about column-wise tables...

you might find this link interesting
https://github.com/PyTables/proposal/blob/master/column-wise-pytables.rst

I actually think could implement this directly (and not wait for pytables to do it) via ctable

@Zelazny7

That does seem promising. Having an on-disk, column-oriented datastore for pandas would be a huge improvement for a large class of use-cases. Even a simple interface for storing/appending/retrieving would be enough to start using pandas in earnest (again for my use case). In the meantime, I've had to switch to R with the FF package. It is pretty much exactly what is described.

The FF package only handles the storage and retrieval of R column vectors. This is a nice addition on it's own as it allows the user to choose what is brought into memory. However, another project quickly created the FFBase packaged which creates special forms of all the commonly used functions that specifically work on the on-disk, FF objects.

Perhaps a similar approach could be taken with Pandas. I wish I was a good enough programmer to help develop, but I can definitely help test any ideas and provide user feedback. Thanks again for your hard work.

@user1827356

@Zelazny7 I'm not sure a column oriented database is a complete solution for the use-case you mentioned - df.groupby('YearMade').sum().head() when df is very large. For example, you would still run into the issue if each column is very large

A proposal for my use cases, which may also cover yours

Assume a csv with columns ['col1', 'col2', 'col3' ...]

To store this to file you would do

pd.csv_to_chunk('<directory>', chunk_by = ['colx', 'coly' ...]) # this could also allow chunking by row number

This would store it as follows:

<directory>/colx/coly/../col1.data
<directory>/colx/coly/../col2.data
...
<directory>/colx/coly/../colN.data

This would then allow you to the following:

pd.apply_by_chunk('<directory>', processing_func_that_receives_dataframe)

This is a first cut API. If you cache the 'chunk map' as metadata then, other interesting possibilities emerge

cf = pd.read_chunk_frame('<directory>')
cf.apply('colx' == 'valx', processing_func_that_receives_dataframe)

Making row/column insertion transparent might be tricky, but possible
The API can also be extended for multi-core/multi-host processing (latter assuming files are on NFS)

Given the huge effort required in an overhaul like blaze, the effort required for this might be considerably smaller

Caveat: My python/big data knowledge is small but growing :) So if I've missed something obvious, feel free not to mince words

@jreback
Collaborator
jreback commented May 2, 2013

@user1827356

you can already do this with the csv iterator (or even better with the HDF iterator), both are row-oriented

see docs http://pandas.pydata.org/pandas-docs/dev/io.html#iterating-through-files-chunk-by-chunk, and http://pandas.pydata.org/pandas-docs/dev/io.html#iterator

@Zelazny7 is working specifically with column oriented data, in that you need to add/delete columns which row-oriented doesn't support as well, because you a) need to track the adds/deletes (and have a layer that allows you to do this transparently), and b) brings in ALL data in a row whenever you bring that row in; in a very wide table this can be somewhat inefficient, a partial solution is this: http://pandas.pydata.org/pandas-docs/dev/io.html#multiple-table-queries

what is the problem you are trying to address?

@user1827356

@jreback

As already noted (in your case a), reading chunk by chunk does not solve adding/deleting columns

I was trying to point out that a column oriented data base by itself does not completely solve the problem of large data sets (as column might be too large to load into memory).

Hence, you need to chunk ROWS AS WELL AS COLUMNS

IMO with large datasets, the challenge is to how to 'chunk' the data. If there were a generic way to do it like proposed in blaze (my opinion comes from this - http://vimeo.com/53031980) that would be great

But allowing a user who is knowledgeable about the dataset to specify dimensions in which he would want to do this would be best. To explain,

Scenario 1: A user like Zelazny7 might want to 'chunk' the dataset merely by row number i.e. 10000 rows etc
Scenario 2: In finance, one might want to chunk by 'stock' or by 'date' or both

As far as case (b) goes, the API I suggested would clearly help as it could be extended to allow the user to specify which of the columns she wants passed to the function

IMO providing an API like above, although not generic would be easier to implement and hence, have a larger impact quicker.

@jreback
Collaborator
jreback commented May 2, 2013

@user1827356

what you are suggesting is quite straightforward with HDFStore today. Write your dataset as a table, and use an iterator (possibly with a query). You will then chunk by rows (or by whatever is in your query). I do this all the time by lining my data up in the appropriate way.

Using your example I keep a panel of: items x dates x ids
Then it is stored as a 2-d indexable with the columns of items, and dates x ids in the rows. Then when you chunk you get what you want.

I also do this is a 4-d case, providing arbitrary axes support. See http://pandas.pydata.org/pandas-docs/dev/io.html#experimental.

@user1827356

@jreback

To be honest, I haven't paid much attention to HDFStore thus far. I'll go through it and seem if it fits the needs. Thanks

@user1827356

@jreback - I briefly reviewed HDFStore and it seems to me that its applicable to out-of-core computing more that multi-process or multi-host processing. For example, having centralized metadata is going to be challenging when you have multiple parallel writers. Is there a way to do this with HDFStore? Or maybe there is a way to split the dataset over multiple files?

@jreback
Collaborator
jreback commented May 8, 2013

Parallel writes are not supported (the docs warn you away from this quite a bit), but you can easily read in parallel. It is quite straightfoward to avoid parallel writes, however; if you need a combine step, then do that synchronously and continue on.

I can often parrallelize (either multi-core of distributed) processes. collect the results, combine, then spin off more processes to do the next step. This is a fairly typical way of doing things.

HDFStore could easily support this method of computation. So I am not exactly sure what you are asking.

@user1827356

I don't follow

  • 'quite straight forward to avoid parallel writes'
  • 'collect the results, combine'

When you parallelize the task, how do you store the intermediate results and combine them? One way would be to store them as different files and then combine them on read. Is that the suggestion?

@jreback
Collaborator
jreback commented May 8, 2013

yes, that's exactly what you would do. I don't know exactly what problem you are attempting, but for instance:

  1. split data, write to work file
  2. apply function to work file, generate results file
  3. combine results into new file
  4. repeat

1 and 3 are not parallellizable, but 2 could be distribution, multi-core, or single-core
(I am generalizing a bit, it is possible 1 and 3 possibily could be done in parallel, but too complicated)

This is essentially a map-reduce paradigm

and here's my 2c. solve your problem in a general way single-core, then , and only if its not efficient, distribute/multi-core. too often people are prematurely optimizing!

@user1827356

OK, what you describe is close to the workflow I have. One of the differences is I use csv files instead of HDFStore

The problems I have with this workflow are:

  • Its too manual, for different data sets (I have a some parts of it as re-usable, but still hacked together)
  • It doesn't work intuitively with pandas (Interface should be like groupby-apply)
  • It doesn't work with multiple backends i.e. ipython, custom grid, multiprocessing etc.

I think it'll be a great addition to pandas to provide a transparent way to achieve this. In short, an out of the box interface that allows this would be great

@jreback
Collaborator
jreback commented May 8, 2013

the intent of this issue was to provide discussion related to the features of this, to in fact make it easier. This is a non-trivial problem as data computations of this nature can take several different forms.

I think supporting csv would be nice, but certainly won't be on the first go (nor would you really want it anyhow, for a big file this is very inefficient, you need to be able to slice chunks out of it - good thing is that this is easy to fix but splitting the file or putting in HDF5)

what do you mean by multiple backends? you really mean multiple methods of computation / distribution, correct?

what would help is a pseudo code like specification (may with some sample data) to get a good idea of what different workflows are

@user1827356

@jreback
By different backends I do mean multiple methods of distribution


I also briefly reviewed this paper http://static.googleusercontent.com/external_content/untrusted_dlcp/research.google.com/en/us/pubs/archive/36632.pdf

IMO, a lot of it's complexity has to do with the fact that the data they deal with is sparse. If we assume the data we deal with is dense, Figure 1 proposes a 'columnar representation of nested data'. I find it very interesting. I understand it might have some similarities to HDFStore, but the differences I see are:

  • It's columnar and hence, can support addition/deletion of columns
  • It's distributed and hence, can support multiple writes
  • To quote - 'As an added benefit, data in a file system can be conveniently manipulated using standard tools, e.g., to transfer to another cluster, change access privileges, or identify a subset of data
    for analysis based on file names'

As far as the API goes, let's assume a simple financial data problem
date, stock, price, qty
5/9/2013, A, 55.55, 100
5/9/2013, B, 99.99, 300
...
5/10/2013, A, 55.65, 100
5/10/2013, B, 99.89, 100

Now, I want to calculate VWAP (volume weighted average price per stock/per day)
df.groupby(['date', 'stock']).apply(lambda sdf: (sdf.price * sdf.qty)/sdf.qty.sum())

Usually there are ~8000 stocks, ~250 million rows/day, 60+ days of data

Ideally, I still want to keep a similar interface

def vwap(sdf): return (sdf.price * sdf.qty)/sdf.qty.sum()

res = df.groupby(['date', 'stock']).apply(vwap, [optional arg(s) for distribution])

So

  • df should encapsulate data distributed over multiple files
  • apply should distribute data in grouped chunks over different jobs
  • apply should collate returned values if any
  • apply should optionally persist columns added/changed to sdf by function equivalent to vwap
@lrq3000
lrq3000 commented Jun 25, 2013

In my case, the approach presented here using HDF stores is not working when one need to do computations on a whole column or a whole line from a huge store since Pandas will retrieve the row/column and store it in memory to do the processing.
Isn't there any way to overload the low-level cell/row/column getter/setter to make them work directly on/from a file on the disk rather than loading the full data in memory?
I mean for example, instead of providing computation functions with a DataFrame, would it be possible to provide them with a meta-DataFrame describing the DataFrame, but only providing the value of a cell on-access. Eg: when computing the mean, or any operation in fact, you only need to retrieve the scalar value from 1 cell at a time.

@jreback
Collaborator
jreback commented Jun 26, 2013

retrieveing only 1 value at a time is extremely inefficient. you can certainly slice a HDFStore now using start and stop and shard the columns out to different groups so as to effectively have a column store, while being able to piecewise do computations.

This trick is that only a small group of computations can actually be done this way (e.g. even mean is tricky because you need to accumulate the sum and also the count).

The point of this PR is to wrap some of this functionality up in a way that is more transparent to the user.

@datnamer
datnamer commented Sep 3, 2014

Just wanted to enthusiastically +1 this.

Might be helpful also to take a look at the following aspects of the blaze project:

Dynd: https://github.com/ContinuumIO/dynd-python

Bcolz: https://github.com/Blosc/bcolz

@hadim
hadim commented Jun 1, 2015

+1 to suscribe to Github mail alert.

@jreback
Collaborator
jreback commented Sep 15, 2016

closing as this is just dask

@jreback jreback closed this Sep 15, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment