Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More disk support? #15

Closed
root-11 opened this issue Feb 5, 2022 · 12 comments
Closed

More disk support? #15

root-11 opened this issue Feb 5, 2022 · 12 comments
Assignees

Comments

@root-11
Copy link
Owner

root-11 commented Feb 5, 2022

Tablite - more (?) or better disk support?

The benefit of tablite is that it doesn't care about whether the data is in RAM or on DISK. The users usage is the same. Right now SQLite is used for the disk operation.
That's ok, but limited to usage with tempdir.

Sometimes users want to use another directory, store the tables in a particular state and then continue their work at a later point in time. Whilst tempdir can be used for that, tablite has all the data from memory sitting in the same SQLite blob. So to persist individual tables means moving data, which equals time.

It is probably better to allow the user to:

  • Table.set_working_dir(pathlib.Path) which allows the user to have all data in here.

  • Set Table.metadata['localfile'] = "tbl_0.h5" which allows tablite to conclude that "working_dir/tbl_0.h5" is where the user wants the file. Even if the user doesn't want to manage the file name but just want to be sure that the data doesn't cause MemoryOverFlow, the user can do:

Table.new_tables_use_disk = True

Table.from_file(filename,...) and all files will be named in the metadata as:

Table.metadata['filename'] = filename1.csv
Table.metadata['localfile'] = working_dir/filename1.h5

If the filename already is h5 format, the data is merely "imported", by reading the h5 format. This requires only that:

  1. The table's metadata is stored in the h5-datasets .attrs (which is dict like anyway)
  2. Each column is stored independently according to it's datatype.
import h5py
f = h5py.File("filename.h5","w")

column_a = f.create_dataset("column A", (100,99,98,97,), dtype='i')  # 4 records.
column_b = f.create_dataset("column B", ("a","b","c","d",), dtype='s')  # 4 records.

? metadata?

I keep writing h5, because I like hdf5's SWMR function for multiprocessing, in contrast to SQLite's blocking mode.

This brings me to some missing features:

  • Table.from_file(filename..., limit=100) should returns head (100 rows) of the file only.
  • Table.from_file(filename..., columns=['a','c','d']) could return the columns a,c,d (not b,e,f,...) only.
  • Table.from_file(filename..., columns=['a','c','d'], datatypes=[int,str,date], error=None) could return the columns a,c,d with corrs. datatypes and uses None for exceptions.
  • Table.to_hdf5(filename=None, ...) is alias for to_hdf5 is tempfile mode.
  • Table.from_file(....) should print the file location is use_disk==True.

Consequences?

  1. This will require that the StoredColumn is inheriting the Table's localfile for data access, but otherwise there's no problem.

  2. The file_reader function find_format can be done in parallel, as each worker can read the h5 file with focus on a single column of values and return the appropriate dataformat in isolation.

  3. When exiting an atexit hook is required for flush() and close() so that the h5 file is closed properly

  4. When opening a file that already exists, the loading time should be limited to the time it takes to read the metadata of the h5 file as everything else already is in place.

@root-11 root-11 self-assigned this Feb 5, 2022
@root-11
Copy link
Owner Author

root-11 commented Feb 6, 2022

I'm starting the branch master --> hdf5.

The complexity will drop a lot by doing the following:

  1. StoredList and Common column vanish. Thereby the simplest data structure becomes a table with 1 column.
  2. The tables use the IO module for creating in-memory tables with HDF5 as the backend. This means that drop to disk only requires to replace the IO module with a python filehandle.
  3. The metadata of the table is stored directly in the HDF5 file.
  4. As IObytes is automatically closed when python garbage collects, unclosed files can't hang around. thereby atexit doesn't need to do anything. I only need to add def __del__(self): self._h5f.close()

@root-11
Copy link
Owner Author

root-11 commented Feb 13, 2022

I have mapped out all the requirements to continue the hdf5 branch in this notebook and think I've got it all covered.

Datatypes = check.
Performance = check.

@root-11
Copy link
Owner Author

root-11 commented Mar 9, 2022

image

@root-11
Copy link
Owner Author

root-11 commented Mar 9, 2022

Some notes:

  • Tablite currently replicates the tables the naive way. This is very wasteful as a adding a column {4} to create a new table recreates the columns from the source {1,2,3} and adds the new column {4}.

  • Another issue is that tablite doesn't support multiprocessing. Everything happens in python's single proc.

To overcome these issues, I'd like to implement a TaskManager that contains a shared_memory that contains all columns.

As this "global name space" is made for multiprocessing, the the TaskManager (main-process) can delegate tasks to a multiprocessing pool where n-1 using messages containing no more than:

  • address of the shared memory block
  • task description (join, filter, ..., custom operation)
  • shared memory address space for the result.

The work can be slices in various way. As we expect each operation to be atomic and incremental the work can be split by cpu_count. Hereby filtering N million rows of data could be divided evenly amongst P processors.

Data Import

We can load data using Tablite's filereaders and load CSV data in chunks (already implemented). Each worker can load a different chunk.

Concatenate <--- Implemented 2022/03/13

Concatenation of tables of identical datatypes is trivial in as far as the TaskManager maintains the global namespace of columns

tbl_1 = {'A': ref_1, 'B': ref_2}  # columns 1 & 2
tbl_2 = {'A': ref_3, 'B': ref_4}  # columns 3 & 4
tbl_4 = tbl_1 + tbl_2  == {'A': [ref_1, ref_3], 'B': [ref_2, ref_4]}   # strict order of references.

tbl_4.columns == ['A','B'] # strict order of headings.

Filter <--- Implemented 2022/03/13 as Table.__getitem__(...)

If the new "column" merely contains the indices of the filtered result, the output of the filtering process becomes a simple shared array booleans: True/False with reference to the columns from the global namespace. This is well aligned to best practices from numpy where booleans masks are used.

tbl_3 = {1,3,4} & index

To compute a filter that uses tbl_3, the new bitarray is merely using subject to tbl_3's index.

Sort

Sort requires a sortation index. The simplest way is to do this in a single proc.
Things that look the same have the same index.

Groupby & Pivot table.

Multikey groupby can happen in 2 steps:

  1. Create a sort index (int32, single proc)
  2. Split the sort index into blocks of work.
  3. As the keys are unique, write conflicts are impossible, so all procs can update the shared array concurrently (if this doesn't work, then create aggregate columns for each proc)

See numpy-groupies for low level implementations that utilise numpy, numba_jit, etc. for acceleration.

NB> The pivot table is best implemented as a sparse array and interpret upon demand. Most pivot tables have data in less than 25% of the cells.

Join

The join operation can happen in 3 steps:

  1. Compute a histogram of keys using groupby
  2. Compute the new table size by calculating the sum product of keypairs (this will differ between inner, left and outer join)
  3. Compute the pair of indices for the new output (2 x int32).

The pair of indices can now be looked up on demand.

Exceeding memory limits

For the TaskManager to assign work to the Workers, a topological sort is required to assure order of execution between tasks.
Even in ".ipynb notebook" mode where the user may execute tasks out of order, the TaskManager must detect dependencies and raise exception at the level of instructions. Raising during run in a remote processor is not adequate.

If the TaskManager detects that the memory footprint exceeds 95% of the available memory, it can drop memory blocks to disk.

For this the TaskMap becomes beneficial: The items that were computed a long time ago (in CPU time) but aren't involved in any tasks can be dropped to disk. Intuitively it seems valid to let the TaskManager use the Least Recently Used (LRU).

If a task by itself appears to exceed memory footprint (for example materializing a table with outer join) the LRU-caching approach will drop completed parts of the outer-join to disk as soon as they're passive.

Note that multiprocessing.shared_memory(name=...,create=True,size=...):

  • can outlive the process that created them (good)
  • can be destroyed using unlink() (very good)

See example on python.org

Garbage collection.

when each table is deleted, we can override __del__ with an update to the TaskManager so it's reference count is maintained. It will then be up to the TaskManager to determine whether the columns are deleted from memory or disk.

@root-11
Copy link
Owner Author

root-11 commented Mar 10, 2022

Multiprocessing - a batch of one.

image

Assume tbl_7 has the columns 'a','b','c','d','date','hour'. Using the signature below, it becomes possible to perform parallel computation on simple function.

tbl_7.add_columns_from_function(function, source, index_on=None, func_args=None, func_kwargs=None)

:param: function: a Callable
:param: source: data source, f.x. ['a','b','c']
:param: index_on: columns used to split the source into chunks f.x. ['date', 'hour']
:param: func_args: optional: arguments for the function
:param: func_kwargs: optional: kwargs as a config for the function.

The interpretation of this function takes place in the TaskManager in the following steps:

  1. check the inputs are valid.
  2. compute the index for chunks (index_on) if it exists
  3. Create tasks for multi-processing
  4. Apply multiprocessing.Pool.map on the chunks.
  5. Collection the results.

Sample task list for batch like operations would look like this:

tasklist = [(tbl_7,0,21,f),(tbl_7,21,56,f),(tbl_7,56,88,f),(tbl_7,88,102,f), ...]   

Each task contains (table, slice_start, slice_end, function, batch=True) and implies select the slice from the table and give it to the function as a batch.

To process the chunks in batches of one (e.g. each row), the task list would be:

tasklist = [(tbl_7,0,21,f, False),(tbl_7,21,56,f, False),(tbl_7,56,88,f, False),(tbl_7,88,102,f, False), ...]   

The only requirement this approach imposes on the user is to assure that the arguments are acceptable as columns of data, such as:

def my_func(a=[1,2,3,4], b=[5,6,7,8], c=[2,4,6,8]):
     result = ( 
           [a[ix]+b[ix]*c[ix] for ix in len(range(a))],
     )
     return result  # returns a single column

And that the result is a tuple of columns. We do not restrict the usage in the signature from including the chunk index, nor from returning multiple columns. Here is another example:

def my_new_func(a=[1,2,3,4], b=[5,6,7,8], c=[2,4,6,8], hour=[6,12,18,0]):
     result = (
           [a[ix]+b[ix]*c[ix] for ix in len(range(a))],
           [h+1 for h in hour]
     )
     return  result  # returns a two columns.

@root-11
Copy link
Owner Author

root-11 commented Mar 12, 2022

Which tasks happens in which process?

  1. Select columns (main) <-- Implemented 2022/03/14 as select __getitem__
  2. Concatenate tables (main) <-- Implemented 2022/03/13 as +, += & stack
  3. Copy tables (main) <--- Implemented 2022/03/13 as t.copy()
  4. create index (mpPool)
  5. Table.from_index(source_table, index) (mpPool)
  6. filter rows (mpPool) (create index: W, create tasks for datablocks: TM, exec.task: W)
  7. sort (mpPool) (create sort index, then tasks, then Table.from_index)
  8. groupby (mpPool) (create histogram: W, create merge task: TM, merge histogram: W)
  9. join (mpPool)
  10. data import (mpPool) <--- Implemented 2022/03/29
  11. data load (main) <--- Implemented 2022/03/29
  12. apply function (mpPool)
  13. Save to hdf5 (mpPool)

@root-11
Copy link
Owner Author

root-11 commented Mar 13, 2022

POC of memory manager now done.
POC of task manager now done.
Next: Put them together.

@root-11
Copy link
Owner Author

root-11 commented Mar 14, 2022

Notes on replace missing values
image

@root-11
Copy link
Owner Author

root-11 commented Mar 15, 2022

Notes on importing vs loading.
image

What is the better way of handling imports?

Here is an overview of the file a.zip:

a.zip/  (hdf5 file to be imported)
	b.zip/   (h5 group)
		c.csv  (h5 group)
			col1   (dataset)
			col2
			....
		x.xlsx
			sheet1
				A   (dataset)
				B
				....
			shee2
				C
				D   
	c.zip
		t.txt.h5
			colA
			colB
	d.csv
		col_1  (dataset)
		col_2
		col_3

I then create the hdf5 file a.zip.h5 as:

a.zip.h5 contents
 b.zip : <HDF5 group "/b.zip" (2 members)>
   c.csv : <HDF5 group "/b.zip/c.csv" (2 members)>
     config : {"import_as": "csv", "newline": "\r\n", "text_qualifier": "\"", "delimiter": ",", "first_row_headers": true, "columns": {"col1": "i8", "col2": "int64"}}
     col1 : <HDF5 dataset "col1": shape (6,), type "<i8">
     col2 : <HDF5 dataset "col2": shape (6,), type "<i8">
   x.xlsx : <HDF5 group "/b.zip/x.xlsx" (2 members)>
     sheet1 : <HDF5 group "/b.zip/x.xlsx/sheet1" (2 members)>
       A : <HDF5 dataset "A": shape (2,), type "|S6">
       B : <HDF5 dataset "B": shape (3,), type "|S8">
     sheet2 : <HDF5 group "/b.zip/x.xlsx/sheet2" (2 members)>
       C : <HDF5 dataset "C": shape (200,), type "<i8">
       D : <HDF5 dataset "D": shape (200,), type "<i8">
 c.zip : <HDF5 group "/c.zip" (1 members)>
   t.txt : <HDF5 group "/c.zip/t.txt" (1 members)>
     logs : <HDF5 dataset "logs": shape (100,), type "<f4">
 d.csv : <HDF5 group "/d.csv" (3 members)>
   col_1 : <HDF5 dataset "col_1": shape (9,), type "<f4">
   col_2 : <HDF5 dataset "col_2": shape (9,), type "<f4">
   col_3 : <HDF5 dataset "col_3": shape (9,), type "<f4">

so that the hdf5 reflects the zip files structure.

IF the size or config changes, the file is re-imported.

** In retrospect I think this is a bad idea. The fact that this could work, doesn't mean it should. **

CONCLUSION: .txt, .csv, .xlsx and .h5 files will be supported. Users can unzip their files before use.

@root-11
Copy link
Owner Author

root-11 commented Mar 17, 2022

only show importable compatible formats.

@root-11
Copy link
Owner Author

root-11 commented Mar 30, 2022

We have a test like this in the test suite:

Table1 = Table()
table1.add_column('A', data=[1, 2, 3])
table1.add_column('B', data=['a', 'b', 'c'])

table2 = table1.copy()  # kept for checking.

# Now we change table1:

table1['A', 'B'] = [ [4, 5, 6], [1.1, 2.2, 3.3] ]

# then we check table2 isn't broken.

assert table2['A'] == [1, 2, 3]
assert table2['B'] == ['a', 'b', 'c']

My question: Is this test still valid?
My mind is set on the reality that tables become immutable, but I can handle the deduplication in the memoryManager, so there's no real problem in permitting this. It only removes the constraint that tables should be immutable.

Conclusions A few hours later....

< Implemented. Tables are now mutable and permit updates. These will be quite slow, so the most efficient approach is to do slice updates rather than individual values.

Fastest of course is to create a new column and drop the old.

@root-11
Copy link
Owner Author

root-11 commented Jul 8, 2022

implemented in commit #4844bc87

@root-11 root-11 closed this as completed Jul 8, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant