In [23]:
import numpy as np
import pandas as pd
from pathlib import Path
import datetime
import sys
import tempfile
from dateutil import tz

from ratschlab_common.io.bigmatrix import TablesBigMatrixReader, TablesBigMatrixWriter

# Big Matrix Usage Example

`Bigmatrix` represents an abstraction for out-of-core computation with large
matrices having some 'metadata' associated with the rows and columns. This
metadata could be for example some patient id and a timestamp for the rows
and a column name for each column, e.g. the name of features. The matrix itself
could represent the values of the features for patients at different times.

The motivation behind this abstraction is, that sometimes a dataset can be almost
represented as a matrix, i.e. has the same datatype across all columns, typically
some float or int. But a few columns represent something else, e.g. time information
or ids. The idea is to handle these metadata separately from the rest. This way
the entire dataset can be handled more efficiently and easily.

`ratschlab_common.io.bigmatrix` serializes all the data to disk using HDF5 via the `pytables` library. A file generated by this module consists of three nodes: one for the matrix, one for the column description and one for the row description. The matrix is written directly by `pytables` whereas the metadata is written by `pandas`. This can be changed to `pytables` as well, for example in case `pandas` would use too much memory for processing the metadata.  

## Generating Fake Data

First, we'll generate some fake data and write it incrementally using `TablesBigMatrixWriter`.

The fake data consists of numerical records of several patients, where each record has a timestamp and a patient id associated. These two columns are stored as row descriptions (i.e. a dataframe). 

Note, that in principle, the time information could be integrated into the numerical matrix. However, indexing and time zone handling etc would have to be done manually. In this approach, we let `pandas` take care of this.

In [24]:
seed = 42**2 # make it deterministic (at least when running from top to bottom)

np.random.seed(seed)

## config:
num_patients = 50
patient_ids = np.random.randint(100000, size=num_patients)

rows_per_patient = 1000
cols = 150

start_date = datetime.datetime(2018, 3, 1)

In [25]:
data_size_in_memory = 8*num_patients*rows_per_patient*cols/1024**3 #GB
data_size_in_memory

0.05587935447692871

In [26]:
def fake_patient_data(pid, rows, cols):
    """
    Generates fake data for one patient with a given patient id
    """
    ids = np.full((rows,), pid)
    
    # a measurement all 5 minutes
    timestamps = pd.date_range(pd.to_datetime(start_date), periods=rows, freq='5min')
    
    row_desc = pd.DataFrame.from_dict({'id' : ids, 'ts' : timestamps})
    
    data = np.random.uniform(size=(rows, cols)) + pid # biasing the values, s.t. we know we got the data from the right patient
    
    return data, row_desc

In [27]:
temp_dir = tempfile.TemporaryDirectory()

output_path = Path(temp_dir.name, 'fake_patient_data.h5')
output_path

PosixPath('/var/folders/ry/d9fpyr5d21xgbjyrl084ssqh0000gn/T/tmptkcxmkl4/fake_patient_data.h5')

Also columns can be described using a data frame. Typically, it would contain just the column name. Here we also add some comment.

In [28]:
col_desc = pd.DataFrame(
    {'name': ["my_col{}".format(c) for c in range(0, cols)],
     'comment': ["You could add some comment for column {}".format(c) for c in range(0, cols)]
    })

col_desc.head()

Unnamed: 0,comment,name
0,You could add some comment for column 0,my_col0
1,You could add some comment for column 1,my_col1
2,You could add some comment for column 2,my_col2
3,You could add some comment for column 3,my_col3
4,You could add some comment for column 4,my_col4


In order to allow efficient access and compression when writing data to disk to HDF5 files, data can be split into "chunks".
See also https://support.hdfgroup.org/HDF5/doc/Advanced/Chunking/index.html

Poorly chosen chunksizes/shapes can have a significantly negative impact on performance, so it's usually worth experimenting a bit. The `ptrepack` tool from the `pytables` package is very useful for this purpose in general. `ratschlab-common` comes with a specialized tool for `bigmatrix` files (see example at the end of the notebook).

In general, when writing HDF5 files incrementally, it is important to select a chunksize/shape manually, as otherwise the HDF5 library (e.g. pytables) may choose an inappropriate size depending on the first batch of data written.

For some advice on how to choose the chunksize, see http://www.pytables.org/usersguide/optimization.html#understanding-chunking
Since blosc compression is currently used as default one should aim at something in the ballpark of 500kb of data per chunk.

In [29]:
chunkshape = (1000, 50) # nicely aligned with the fake data (doesn't need to be the case)

chunksize_kb = np.dtype('float').itemsize * chunkshape[0] * chunkshape[1]/1024
chunksize_kb

390.625

Since we assume we are dealing with a lot of data, we are going to write the patient data sequentially.

Note, that only rows can be appended, but not columns.

In [30]:
writer = TablesBigMatrixWriter()

if output_path.exists():
    output_path.unlink()
    
for pid in patient_ids:
    data, row_desc = fake_patient_data(pid, rows_per_patient, cols)
    
    writer.write_or_append(output_path, data, row_desc, col_desc, chunkshape=chunkshape)
    

In [31]:
!ls -l $output_path

-rw-r--r--  1 marc  staff  38409169 May 24 10:20 /var/folders/ry/d9fpyr5d21xgbjyrl084ssqh0000gn/T/tmptkcxmkl4/fake_patient_data.h5


Below the groups in the newly created HDF5 file

In [32]:
!ptdump $output_path

/ (RootGroup) ''
/data (EArray(50000, 150), fletcher32, shuffle, blosc:lz4(5)) ''
/col_descr (Group) ''
/col_descr/table (Table(150,), shuffle, blosc:lz4(5)) ''
/row_descr (Group) ''
/row_descr/table (Table(50000,), shuffle, blosc:lz4(5)) ''


## Reading Back Data

In [33]:
# reading back data of a "random" patient
some_id = patient_ids[-1]
some_id

26710

In [34]:
# getting all the data of some patient
with TablesBigMatrixReader(output_path) as reader:
    # first getting the row indices of the data belonging to the given patient
    row_indices = reader.get_row_indices('id == {}'.format(some_id))
    
    # accessing the matrix
    data_tmp = reader[row_indices, :]
data_tmp

array([[26710.20143677, 26710.55182748, 26710.87855975, ...,
        26710.81713081, 26710.73815005, 26710.11845951],
       [26710.78651242, 26710.69449985, 26710.29312988, ...,
        26710.6971596 , 26710.43853235, 26710.15546655],
       [26710.84996235, 26710.45633331, 26710.99427889, ...,
        26710.87577065, 26710.38067227, 26710.93668617],
       ...,
       [26710.3633043 , 26710.80679326, 26710.86311316, ...,
        26710.04492171, 26710.37460093, 26710.53402499],
       [26710.65226909, 26710.07424714, 26710.88747023, ...,
        26710.22762388, 26710.1339891 , 26710.48514644],
       [26710.02089059, 26710.13063079, 26710.80944825, ...,
        26710.5185127 , 26710.93133834, 26710.50079785]])

We can also get data on a specific day. The code uses pandas' `dataframe.query` function. Query syntax is a bit cumbersome...

Note that '2018-03-02' implicitly gets converted to '2018-03-02 00:00:00' local time!

In [35]:
with TablesBigMatrixReader(output_path) as reader:
    row_indices = reader.get_row_indices("id == {} & ts >= '2018-03-02' & ts < '2018-03-03'".format(some_id))
    
    row_desc = reader.row_desc()
    timestamps = row_desc.loc[row_indices, 'ts']
    
    data_tmp = reader[row_indices, :]
timestamps[1:10], data_tmp[1:10, ]

(49289   2018-03-02 00:05:00
 49290   2018-03-02 00:10:00
 49291   2018-03-02 00:15:00
 49292   2018-03-02 00:20:00
 49293   2018-03-02 00:25:00
 49294   2018-03-02 00:30:00
 49295   2018-03-02 00:35:00
 49296   2018-03-02 00:40:00
 49297   2018-03-02 00:45:00
 Name: ts, dtype: datetime64[ns],
 array([[26710.90660873, 26710.8418034 , 26710.71130172, ...,
         26710.28144534, 26710.50091314, 26710.55610162],
        [26710.72233082, 26710.00978424, 26710.93947081, ...,
         26710.28969285, 26710.95075967, 26710.31592936],
        [26710.22408737, 26710.19909297, 26710.86823822, ...,
         26710.44479405, 26710.02135051, 26710.70572147],
        ...,
        [26710.312206  , 26710.72083411, 26710.4207162 , ...,
         26710.16075147, 26710.0129967 , 26710.90915809],
        [26710.40858287, 26710.4680696 , 26710.54513204, ...,
         26710.82508201, 26710.99749598, 26710.82239922],
        [26710.41591162, 26710.45859962, 26710.34461571, ...,
         26710.86537393, 26710

In [36]:
# getting only a few columns of data for some patient
with TablesBigMatrixReader(output_path) as reader:
    row_indices = reader.get_row_indices('id == {}'.format(some_id))
    col_indices = reader.get_col_indices_by_name(['my_col5', 'my_col6'])
    
    data_tmp = reader[row_indices, col_indices]
data_tmp

array([[26710.29889554, 26710.66328001],
       [26710.38902207, 26710.82468611],
       [26710.91017141, 26710.85773244],
       ...,
       [26710.9989812 , 26710.22916032],
       [26710.85948349, 26710.57984862],
       [26710.30660865, 26710.45473952]])

In [37]:
def median_of_column(path, col_name):
    # calculating the median over an entire column (i.e. over all patients)
    with TablesBigMatrixReader(path) as reader:
        col_indices = reader.get_col_indices_by_name([col_name])

        return np.median(reader[:, col_indices])

In [38]:
%time median_of_column(output_path, 'my_col7')

CPU times: user 65.3 ms, sys: 7.15 ms, total: 72.4 ms
Wall time: 72.2 ms


40353.499702672576

### Tuning the chunkshape

`ratschlab_common` includes a simple command line tool to tune the chunkshape of an existing h5 file written by `TablesBigMatrixWriter`. It is a wrapper around the `ptrepack` utility (see also http://www.pytables.org/usersguide/utilities.html#ptrepack). It will rewrite data to a new file using the specified chunkshape for the matrix and by default just copy over the metadata to the new file.

In [39]:
!bigmatrix_repack --help

Usage: bigmatrix_repack [OPTIONS] SOURCE_FILE DEST_FILE

  Repacks bigmatrix h5 files using pytables' ptrepack.

  Main use case is to experiment with different chunkshapes for the matrix.

Options:
  --matrix-chunkshape TEXT        Chunkshape of the matrix. Can be a tuple
                                  like "(10, 20)" or "keep" or "auto"
  --matrix-repack-options TEXT    options passed to ptrepack for the matrix
                                  node. If set, the matrix-chunkshape option
                                  is ignored.
  --row-meta-repack-options TEXT  options passed to ptrepack for the row
                                  metadata node.
  --col-meta-repack-options TEXT  options passed to ptrepack for the column
                                  metadata node.
  --force                         If set, destination file will be overwritten
                                  if it exists
  --help                          Show this message and exit.


Changing the chunk shape for columnar access pattern using
slabs that thin is a bit extreme. But we can roughly read 3-4 times faster. Note, that this may not always work that well also writing times are larger.

In [40]:
origin = str(output_path)
dest = Path(temp_dir.name, 'repacked.h5')

!bigmatrix_repack --matrix-chunkshape "(5000,1)" --force $output_path $dest

In [41]:
!ptdump -v $dest

/ (RootGroup) ''
/data (EArray(50000, 150), fletcher32, shuffle, blosc:lz4(5)) ''
  atom := Float64Atom(shape=(), dflt=0.0)
  maindim := 0
  flavor := 'numpy'
  byteorder := 'little'
  chunkshape := (5000, 1)
/col_descr (Group) ''
/col_descr/table (Table(150,), shuffle, blosc:lz4(5)) ''
  description := {
  "index": Int64Col(shape=(), dflt=0, pos=0),
  "values_block_0": StringCol(itemsize=41, shape=(2,), dflt=b'', pos=1)}
  byteorder := 'little'
  chunkshape := (728,)
/row_descr (Group) ''
/row_descr/table (Table(50000,), shuffle, blosc:lz4(5)) ''
  description := {
  "index": Int64Col(shape=(), dflt=0, pos=0),
  "values_block_0": Int64Col(shape=(1,), dflt=0, pos=1),
  "values_block_1": Int64Col(shape=(1,), dflt=0, pos=2)}
  byteorder := 'little'
  chunkshape := (2730,)


In [42]:
%time median_of_column(dest, 'my_col7')

CPU times: user 19.2 ms, sys: 6.12 ms, total: 25.3 ms
Wall time: 23.3 ms


40353.499702672576

In [43]:
temp_dir.cleanup() # clean up of temporary data