In [None]:
# default_exp dask_dataset

# Dask Datasets

> Module to build tensorflow datasets from Dask DataFrames.

In [None]:
# export
from dask import dataframe as dd
import pandas as pd
from AugmenTF.text.text_preprocessing import preprocess_text
from AugmenTF.core import *
from random import shuffle
import sys
import random
import tensorflow as tf



In [None]:
# hide
class IdxMapper():
    def __init__(self):
        self.idx=0
    
    def __call__(self, df):
        r = list(range(self.idx, self.idx + df.index.size))
        self.idx += df.index.size
        return r

In [None]:
# hide
def ddf_make_index_monotonic(ddf, persist=True):
    ddf['idx'] = 1
    ddf['idx'] = ddf.idx.cumsum() - 1
    ddf = ddf.set_index('idx', sorted=True)
    return ddf

In [None]:
# hide
def cull_empty_partitions(df):
    ll = list(df.map_partitions(len).compute())
    df_delayed = df.to_delayed()
    df_delayed_new = list()
    pempty = None
    for ix, n in enumerate(ll):
        if 0 == n:
            pempty = df.get_partition(ix)
        else:
            df_delayed_new.append(df_delayed[ix])
    if pempty is not None:
        df = dd.from_delayed(df_delayed_new, meta=pempty)
    return df

In [None]:
# export
TYPE_MAPPING = {
    'str':tf.string,
    'object':tf.string,
    'int':tf.int32,
    'float':tf.float32,
    'cat':tf.int32
}

_all_ = ['TYPE_MAPPING']

def _map_dtypes(df):
    '''```
    Returns a dictionary of 
    ```'''
    type_dict = {
        col:str(t)
        for col, t
        in df.dtypes.to_dict().items()
    }
    for k, v in type_dict.items():
        for tpe, tf_type in TYPE_MAPPING.items():
            if tpe in v:
                type_dict[k] = tf_type
    return type_dict

In [None]:
# export
class DaskDataset():
    def __init__(self, df):
        '''
        Class to manage Dask dataframes as Tensorflow datasets.
        
        
        df: any dask.DataFrame
        
        '''
        assert isinstance(df, dd.DataFrame), '''
        df must be a dask.dataframe.DataFrame
        '''
        self.df = ddf_make_index_monotonic(df)
        self.n_rows = df.shape[0].compute()
        self.n_cols = len(df.columns)
        self.shape = (self.n_rows, self.n_cols)
        self.index = self.df.index.compute().tolist()
    
    def __len__(self):
        return self.n_rows
    
    def __getitem__(self, idx):
        _idx = self.index[idx]
        return self.df.loc[_idx].compute()
    
    def select_columns(self, x_cols=None, y_cols=None):
        '''
        Select which columns you want as your X and y.
        '''
        self.x_cols = x_cols
        self.y_cols = y_cols
    
    def to_parquet(self, path, partition_cols=None):
        """```
        Saves the self.df as parquet.
        
        Options:
        path: str or Path, path to where you want the saved data to live.
        partition_cols: list of columns on which to partition the data.
        ```"""
        
        path = Path(path)
        if not path.parent.exists():
            path.parent.mkdir(parents=True)
        if partition_cols and isinstance(partition_cols, str):
            partition_cols = [partition_cols]
        self.df.to_parquet(str(path), partition_on=partition_cols, write_index=False)
        
    def __repr__(self):
        return (
            f'{self.__class__.__name__}\n' +
            f'Columns and Datatypes\n' +
            f'{self.df.dtypes}\n'  +
            f'Shape: {self.shape}'
        )
    
    def get_tf_dataset(self, randomize_order=True):
        '''
        Returns a tf.data.Dataset of the data.
        Must have x_cols and y_cols selected using DaskDataset.select_columns
        '''
        
        # confirm that x and y have been set
        if (not hasattr(self, 'x_cols')) or (not hasattr(self, 'y_cols')):
            raise ValueError('Must use select_columns to select data and labels')
        
        # Create a generator from iterrows
        def gen():
            for i, row in self.df.iterrows():
                if isinstance(self.x_cols, (list,tuple)):
                    x = row[self.x_cols].values
                else:
                    x = row[self.x_cols]
                if isinstance(self.y_cols, (list,tuple)):
                    y = row[self.y_cols].values
                else:
                    y = row[self.y_cols]
            yield x,y
        
        # Get the datatypes in the right format
        _types = _map_dtypes(self.df)
        if isinstance(self.x_cols, (list, tuple)):
            x_types = (_types[col] for col in self.x_cols)
        else:
            x_types = _types[self.x_cols]
        if isinstance(self.y_cols, (list, tuple)):
            y_types = (_types[col] for col in self.y_cols)
        else:
            y_types = _types[self.y_cols]
        
        
        return tf.data.Dataset.from_generator(gen, (x_types, y_types))

In [None]:
# export
def make_random_splits(df, train_frac=0.8, val_frac=0.1, test_frac=0.1, stratify_by=None, random_state=None):
    '''```
    Splits your data into train, validation, and test sets.
    This function also supports stratification by one column.
    
    Options:
    df: any dask.DataFrame
    train_frac: float, fraction of data you'd like to be in your train df
    val_frac: float, fraction of data you'd like to be in your validation df
    test_frac: float, fraction of data you'd like to be in your test df.
               if None, will only return train and val datasets.
    stratify_by: str, col you'd like to use to stratify your splits
    random_state: int or None, random state for your splits
    
    Returns: tuple of DataFrames.
    ```'''

    assert isinstance(df, dd.DataFrame), '''
    df myst be a dask.dataframe.DataFrame
    '''

    if stratify_by:
        assert stratify_by in df.columns

    fracs = [frac for frac in [train_frac, val_frac, test_frac] if frac]
    assert sum(fracs) == 1, '''
    train_frac, val_frac, and test_frac must sum to 1.0
    '''

    df_lists = [[],[],[]]
    if not stratify_by:
        return [
            _df.reset_index(drop=True)
            for _df
            in df.random_split(fracs, random_state=random_state)
        ]
    else:
        df_lists = [[],[],[]]
        values = df[stratify_by].unique().compute().tolist()
        gb = df.groupby(stratify_by)
        for v in values:
            _df = gb.get_group(v)
            _split = _df.random_split(fracs, random_state=random_state)
            for i, _sdf in enumerate(_split):
                df_lists[i].append(_sdf)
        for i,l in enumerate(df_lists):
            if l:
                df_lists[i] = dd.concat(l, interleave_partitions=True).reset_index(drop=True)
            else:
                del df_lists[i]
        return tuple(df_lists)

Below, we'll illustrate a basic workflow for making `tf.data.Dataset`s from `DaskDataset`s, including loading and splitting data, and making transformations.

In [None]:
from AugmenTF.text.text_preprocessing import preprocess_text

First, we'll load data into a `DataFrame`. Dataframes can also be read from CSVs, Parquet, etc.

In [None]:
df = pd.read_csv('data/csv/20_newsgroups.csv')
ddf = dd.from_pandas(df, npartitions=4)

ddf.head()

Unnamed: 0,text,labels,label_names
0,From: lerxst@wam.umd.edu (where's my thing)\nS...,7,rec.autos
1,From: guykuo@carson.u.washington.edu (Guy Kuo)...,4,comp.sys.mac.hardware
2,From: twillis@ec.ecn.purdue.edu (Thomas E Will...,4,comp.sys.mac.hardware
3,From: jgreen@amber (Joe Green)\nSubject: Re: W...,1,comp.graphics
4,From: jcm@head-cfa.harvard.edu (Jonathan McDow...,14,sci.space


To illustrate a simple transformation, we'll preprocess the text column.

In [None]:
ddf.text = ddf.text.map(preprocess_text)

In [None]:
ddf.head()

Unnamed: 0,text,labels,label_names
0,from : lerxst @ wam . umd . edu ( where ' s my...,7,rec.autos
1,from : guykuo @ carson . u . washington . edu ...,4,comp.sys.mac.hardware
2,from : twillis @ ec . ecn . purdue . edu ( tho...,4,comp.sys.mac.hardware
3,from : jgreen @ amber ( joe green ) \n subject...,1,comp.graphics
4,from : jcm @ head - cfa . harvard . edu ( jona...,14,sci.space


Next, we'll make one train/val split and stratify by the `"label_names"` column.

In [None]:
train, val = make_random_splits(ddf, .8, .2, None, 'label_names', random_state=42)

Below, we make `DaskDataset`s out of the train and val DataFrames.

In [None]:
train_ds, val_ds = DaskDataset(train), DaskDataset(val)

We can easily index `DaskDatasets` since they enforce monotonically increasing indices.

In [None]:
train_ds[1000]

Unnamed: 0_level_0,text,labels,label_names
idx,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
1000,from : speedy @ engr . latech . edu ( speedy m...,8,rec.motorcycles


Next, to create a tensorflow Dataset, we have to set the X and y columns.

In [None]:
train_ds.select_columns(x_cols='text', y_cols='labels')
val_ds.select_columns(x_cols='text', y_cols='labels')

Finally, we can easily get tensorflow datasets from the train or val datasets.

In [None]:
tf_train = train_ds.get_tf_dataset()

In [None]:
for x_batch, y_batch in tf_train.batch(10):
    break

In [None]:
x_batch

<tf.Tensor: shape=(1,), dtype=string, numpy=
array([b'from : rdippold @ qualcomm . com ( ron " asbestos " dippold ) \n subject : re : once tapped , your code is no good any more . \n originator : rdippold @ qualcom . qualcomm . com \n nntp - posting - host : qualcom . qualcomm . com \n organization : qualcomm , inc . , san diego , ca \n distribution : na \n lines : 8 \n \n random @ presto . uucp ( jeff w . hyche ) writes : \n > yes , " clipper " is a trademark of intergraph . its the risc chip used \n > in some of thier workstations . i wonder what intergraph is going to \n > do to this infringement on thier name sake ? \n \n probably keep quiet and take it , lest they get their kneecaps busted . \n - - \n good news . ten weeks from friday will be a good day .'],
      dtype=object)>

In [None]:
y_batch

<tf.Tensor: shape=(1,), dtype=int32, numpy=array([11], dtype=int32)>

TODO:

* regardless of batch size, tf dataset only returns batches of 1.