In [None]:
#|hide
#| eval: false
! [ -e /content ] && pip install -Uqq fastai  # upgrade fastai on colab

In [None]:
#|default_exp core

In [None]:
#|export
from __future__ import annotations
from fastai.torch_basics import *
from fastai.data.all import *
from fastai.tabular.all import *
import dask.dataframe as dd

In [None]:
#|hide
from nbdev.showdoc import *

# BigTabular core

> Basic functions to preprocess larger-than-memory tabular data with Dask before assembling it in `DataLoaders`.

## Initial preprocessing

Define Dask versions of the `make_date`, `add_datepart`, and `add_elapsed_times` functions defined in `tabular.core`. The `dask_make_date` function uses Dask's `to_datetime` function rather than the Pandas version. The `dask_add_datepart` and `dask_add_elapsed_times` functions just wrap `add_datepart` in the Dask `map_partitions` function.

In [None]:
#|export
def dask_make_date(ddf, date_field):
    "Convert `df[date_field]` to date type."
    ddf[date_field] = dd.to_datetime(ddf[date_field], infer_datetime_format=True)

In [None]:
df = pd.DataFrame({'date': ['2019-12-04', '2019-11-29', '2019-11-15', '2019-10-24']})
ddf = dd.from_pandas(df)
dask_make_date(ddf, 'date')
test_eq(ddf['date'].dtype, np.dtype('datetime64[ns]'))

In [None]:
#|export
def dask_add_datepart(ddf, field_name, prefix=None, drop=True, time=False):
    "Helper function that adds columns relevant to a date in the column field_name of ddf"
    dask_make_date(ddf, field_name)
    # return ddf.map_partitions(lambda df: add_datepart(df, field_name, prefix=prefix, drop=drop, time=time))
    return ddf.map_partitions(partial(add_datepart, field_name=field_name, prefix=prefix, drop=drop, time=time))

For example if we have a series of dates we can then generate features such as `Year`, `Month`, `Day`, `Dayofweek`, `Is_month_start`, etc as shown below:

In [None]:
df = pd.DataFrame({'date': ['2019-12-04', None, '2019-11-15', '2019-10-24']})
ddf = dd.from_pandas(df)
ddf = dask_add_datepart(ddf, 'date')
ddf.head()

Unnamed: 0,Year,Month,Week,Day,Dayofweek,Dayofyear,Is_month_end,Is_month_start,Is_quarter_end,Is_quarter_start,Is_year_end,Is_year_start,Elapsed
0,2019.0,12.0,49.0,4.0,2.0,338.0,False,False,False,False,False,False,1575418000.0
1,,,,,,,False,False,False,False,False,False,
2,2019.0,11.0,46.0,15.0,4.0,319.0,False,False,False,False,False,False,1573776000.0
3,2019.0,10.0,43.0,24.0,3.0,297.0,False,False,False,False,False,False,1571875000.0


In [None]:
#|hide
test_eq(ddf.columns, ['Year', 'Month', 'Week', 'Day', 'Dayofweek', 'Dayofyear', 'Is_month_end', 'Is_month_start',
            'Is_quarter_end', 'Is_quarter_start', 'Is_year_end', 'Is_year_start', 'Elapsed'])

test_eq(ddf[ddf.Elapsed.isna()].compute().shape, (1, 13))

# Test that week dtype is consistent with other datepart fields
test_eq(ddf['Year'].dtype, ddf['Week'].dtype)

test_eq(pd.api.types.is_numeric_dtype(ddf['Elapsed']), True)

In [None]:
#|hide
df = pd.DataFrame({'f1': [1.],'f2': [2.],'f3': [3.],'f4': [4.],'date':['2019-12-04']})
ddf = dd.from_pandas(df)
ddf = dask_add_datepart(ddf, 'date')
ddf.head()

Unnamed: 0,f1,f2,f3,f4,Year,Month,Week,Day,Dayofweek,Dayofyear,Is_month_end,Is_month_start,Is_quarter_end,Is_quarter_start,Is_year_end,Is_year_start,Elapsed
0,1.0,2.0,3.0,4.0,2019,12,49,4,2,338,False,False,False,False,False,False,1575418000.0


In [None]:
#|hide
# Test Order of columns when date isn't in first position
test_eq(ddf.columns, ['f1', 'f2', 'f3', 'f4', 'Year', 'Month', 'Week', 'Day',
            'Dayofweek', 'Dayofyear', 'Is_month_end', 'Is_month_start',
            'Is_quarter_end', 'Is_quarter_start', 'Is_year_end', 'Is_year_start', 'Elapsed'])

# Test that week dtype is consistent with other datepart fields
test_eq(ddf['Year'].dtype, ddf['Week'].dtype)

In [None]:
#|export
def dask_add_elapsed_times(ddf, field_names, date_field, base_field):
    dask_make_date(ddf, date_field)
    # return ddf.map_partitions(lambda df: add_elapsed_times(df, field_names, date_field, base_field))
    return ddf.map_partitions(partial(add_elapsed_times, field_names=field_names, date_field=date_field, base_field=base_field))

In [None]:
df = pd.DataFrame({'date': ['2019-12-04', '2019-11-29', '2019-11-15', '2019-10-24'],
                   'event': [False, True, False, True], 'base': [1,1,2,2]})
ddf = dd.from_pandas(df)
ddf = dask_add_elapsed_times(ddf, ['event'], 'date', 'base')
ddf.head()

Unnamed: 0,date,event,base,Afterevent,Beforeevent,event_bw,event_fw
0,2019-12-04,False,1,5,0,1.0,0.0
1,2019-11-29,True,1,0,0,1.0,1.0
2,2019-11-15,False,2,22,0,1.0,0.0
3,2019-10-24,True,2,0,0,1.0,1.0


In [None]:
#|export
def dask_cont_cat_split(df, max_card=20, dep_var=None):
    "Helper function that returns column names of cont and cat variables from given `df`."
    cont_names, cat_names = [], []
    for label in df:
        if label in L(dep_var): continue
        if ((pd.api.types.is_integer_dtype(df[label].dtype) and
            # Change for Dask compatibility
            df[label].nunique().compute() > max_card) or
            pd.api.types.is_float_dtype(df[label].dtype)):
            cont_names.append(label)
        else: cat_names.append(label)
    return cont_names, cat_names

We also define a Dask version of the `cont_cat_split` function. The only difference to the original function is calling `compute` on the Dask dataframe to determine the cardinality of the columns. This function works by determining if a column is continuous or categorical based on the cardinality of its values. If it is above the `max_card` parameter (or a `float` datatype) then it will be added to the `cont_names` else `cat_names`. An example is below:

In [None]:
# Example with simple numpy types
df = pd.DataFrame({'cat1': [1, 2, 3, 4], 'cont1': [1., 2., 3., 2.], 'cat2': ['a', 'b', 'b', 'a'],
                   'i8': pd.Series([1, 2, 3, 4], dtype='int8'),
                   'u8': pd.Series([1, 2, 3, 4], dtype='uint8'),
                   'f16': pd.Series([1, 2, 3, 4], dtype='float16'),
                   'y1': [1, 0, 1, 0], 'y2': [2, 1, 1, 0]})
ddf = dd.from_pandas(df)
cont_names, cat_names = dask_cont_cat_split(ddf)

In [None]:
#| echo: false
print(f'cont_names: {cont_names}\ncat_names: {cat_names}`')

cont_names: ['cont1', 'f16']
cat_names: ['cat1', 'cat2', 'i8', 'u8', 'y1', 'y2']`


In [None]:
#|hide
# Test all columns
cont, cat = dask_cont_cat_split(ddf)
test_eq((cont, cat), (['cont1', 'f16'], ['cat1', 'cat2', 'i8', 'u8', 'y1', 'y2']))

# Test exclusion of dependent variable
cont, cat = dask_cont_cat_split(ddf, dep_var='y1')
test_eq((cont, cat), (['cont1', 'f16'], ['cat1', 'cat2', 'i8', 'u8', 'y2']))

# Test exclusion of multi-label dependent variables
cont, cat = dask_cont_cat_split(ddf, dep_var=['y1', 'y2'])
test_eq((cont, cat), (['cont1', 'f16'], ['cat1', 'cat2', 'i8', 'u8']))

# Test maximal cardinality bound for int variable
cont, cat = dask_cont_cat_split(ddf, max_card=3)
test_eq((cont, cat), (['cat1', 'cont1', 'i8', 'u8', 'f16'], ['cat2', 'y1', 'y2']))

cont, cat = dask_cont_cat_split(ddf, max_card=2)
test_eq((cont, cat), (['cat1', 'cont1', 'i8', 'u8', 'f16', 'y2'], ['cat2', 'y1']))

cont, cat = dask_cont_cat_split(ddf, max_card=1)
test_eq((cont, cat), (['cat1', 'cont1', 'i8', 'u8', 'f16', 'y1', 'y2'], ['cat2']))

In [None]:
# Example with pandas types and generated columns
df = pd.DataFrame({'cat1': pd.Series(['l','xs','xl','s'], dtype='category'),
                    'ui32': pd.Series([1, 2, 3, 4], dtype='UInt32'),
                    'i64': pd.Series([1, 2, 3, 4], dtype='Int64'),
                    'f16': pd.Series([1, 2, 3, 4], dtype='Float64'),
                    'd1_date': ['2021-02-09', None, '2020-05-12', '2020-08-14'],
                    })
ddf = dd.from_pandas(df)
ddf = dask_add_datepart(ddf, 'd1_date', drop=False)

ddf['cat1'] = ddf['cat1'].cat.set_categories(['xl','l','m','s','xs'], ordered=True)

cont_names, cat_names = dask_cont_cat_split(ddf, max_card=0)

In [None]:
#| echo: false
print(f'cont_names: {cont_names}\ncat_names: {cat_names}')

cont_names: ['ui32', 'i64', 'f16', 'd1_Year', 'd1_Month', 'd1_Week', 'd1_Day', 'd1_Dayofweek', 'd1_Dayofyear', 'd1_Elapsed']
cat_names: ['cat1', 'd1_date', 'd1_Is_month_end', 'd1_Is_month_start', 'd1_Is_quarter_end', 'd1_Is_quarter_start', 'd1_Is_year_end', 'd1_Is_year_start']


In [None]:
#|hide
cont, cat = dask_cont_cat_split(ddf, max_card=0)
test_eq((cont, cat), (
    ['ui32', 'i64', 'f16', 'd1_Year', 'd1_Month', 'd1_Week', 'd1_Day', 'd1_Dayofweek', 'd1_Dayofyear', 'd1_Elapsed'],
    ['cat1', 'd1_date', 'd1_Is_month_end', 'd1_Is_month_start', 'd1_Is_quarter_end', 'd1_Is_quarter_start', 'd1_Is_year_end', 'd1_Is_year_start']
    ))

In [None]:
#|export
def get_random_train_mask(df, train_frac=0.8):
    return pd.Series(np.random.random(len(df)) < train_frac)

A function to create a random train/validation set mask over the Dask dataframe.

## Tabular -

In [None]:
#|export
# TODO: align this function with the the tabular.core version
class _TabIloc:
    "Get/set rows by iloc and cols by name"
    def __init__(self,to): self.to = to
    def __getitem__(self, idxs):
        df = self.to.items
        if isinstance(idxs,tuple):
            rows,cols = idxs
            cols = df.columns.isin(cols) if is_listy(cols) else df.columns.get_loc(cols)
        else: rows,cols = idxs,slice(None)
        return df.iloc[rows, cols]

In [None]:
#|export
class TabularDask(CollBase, GetAttr, IterableDataset):
    """
    A Dask `DataFrame` wrapper that knows which cols are cont/cat/y, and returns rows in `__iter__`.
    The aim is to replicate the TabularPandas API as closely as possible.
    """
    _default,with_cont='procs',True
    def __init__(
        self, ddf, procs=None, cat_names=None, cont_names=None, y_names=None, y_block=None, train_mask_func=None,
        do_setup=True, device=None, reset_index=True
    ):
        self.items = ddf.copy()
        # if "_int_train_mask" not in ddf.columns:
        #     if train_mask_func is None:
        #         train_mask_func = get_random_train_mask
        #     self.items["_int_train_mask"] = ddf.map_partitions(
        #         train_mask_func, meta=pd.Series(name="_int_train_mask", dtype="bool")
        #     )
        if "_int_train_mask" not in ddf.columns:
            if train_mask_func is None:
                self.items["_int_train_mask"] = True
            else:
                self.items["_int_train_mask"] = ddf.map_partitions(
                    train_mask_func, meta=pd.Series(name="_int_train_mask", dtype="bool")
                )
        if reset_index: ddf = ddf.reset_index(drop=True)
        # self._dl_type, self._dbunch_type = DaskDataLoader, DaskDataLoaders
        self.y_names, self.device = L(y_names), device

        if y_block is None and self.y_names:
            # Make ys categorical if they're not numeric
            ys = self.items[self.y_names]
            if len(ys.select_dtypes(include='number').columns)!=len(ys.columns):
                y_block = DaskCategoryBlock()
            else:
                y_block = DaskRegressionBlock()
        if y_block is not None and do_setup:
            if callable(y_block): y_block = y_block()
            # A bit hacky, but ensuring compatibility with `CategoryBlock` and `RegressionBlock`
            # TODO: don't think we need this anymore
            if isinstance(y_block.type_tfms[0], Categorize): y_block.type_tfms = DaskCategorize()
            elif isinstance(y_block.type_tfms[0], RegressionSetup): y_block.type_tfms = DaskRegressionSetup()
            procs = L(procs) + y_block.type_tfms

        self.cat_names, self.cont_names, self.procs = L(cat_names), L(cont_names), Pipeline(procs)
        self.start, self.end = 0, len(self.items)
        if do_setup: self.setup()

    def new(self, df):
        return type(self)(df, do_setup=False, y_block=TransformBlock(),
                          **attrdict(self, 'procs','cat_names','cont_names','y_names', 'device'))
        
    def subset(self, i):
        train = self.items['_int_train_mask']
        return self.new(self.items[train if i==0 else ~train])

    def copy(self): self.items = self.items.copy(); return self
    def decode(self): return self.procs.decode(self)
    def decode_row(self, row):
        row = row.to_frame().T
        row[list(self.cont_names)] = row[list(self.cont_names)].astype(np.float32)
        row[list(self.cat_names)] = row[list(self.cat_names)].astype(np.int32)
        return self.new(dd.from_pandas(row)).decode().items.compute().iloc[0]

    def show(self, max_n=10, **kwargs):
        display_df(
            self.new(self.all_cols).decode().items.head(max_n).drop(columns="_int_train_mask")
        )

    def setup(self): self.procs.setup(self)
    def process(self): self.procs(self)
    def loc(self): return self.items.loc
    def iloc(self): return _TabIloc(self)
    def targ(self): return self.items[list(self.y_names)]
    def x_names (self): return self.cat_names + self.cont_names
    def n_subsets(self): return 2
    def y(self): return self[self.y_names[0]]
    def new_empty(self): raise NotImplementedError
    def to_device(self, d=None):
        self.device = d
        return self

    def all_col_names(self):
        ys = [n for n in self.y_names if n in self.items.columns]
        return self.x_names + self.y_names if len(ys) == len(self.y_names) else self.x_names

    def n_inp(self): return int(len(self.cat_names)>0) + int(len(self.cont_names)>0)
        
    def __iter__(self):
        cat_stop = len(self.cat_names)
        con_stop = cat_stop + len(self.cont_names)
        for i in range(self.items.npartitions):
            # df = self.items.get_partition(i).compute()[self.cat_names + self.cont_names + self.y_names]
            df = self.items.get_partition(i).compute()[self.all_col_names]
            ys = [n for n in self.y_names if n in self.items.columns]
            for row in df.itertuples(index=False):
                res = (list(row[:cat_stop]), list(row[cat_stop:con_stop]))
                if len(ys) == len(self.y_names): res = res + (list(row[con_stop:]),) 
                # yield (list(cats), list(conts), list(targ))
                yield res

    def transform(self, cols, f, all_col=True):
        if not all_col: cols = [c for c in cols if c in self.items.columns]
        if len(cols) > 0:
            meta_dtype = "int16" if cols[0] in self.cat_names else "float32"
            meta = pd.DataFrame({c: [] for c in cols}, dtype=meta_dtype)
            self[cols] = self[cols].map_partitions(lambda df: df.transform(f), meta=meta)

    def dataloaders(self, 
        bs:int=64, # Batch size
        shuffle_train:bool=None, # (Deprecated, use `shuffle`) Shuffle training `DataLoader`
        shuffle:bool=True, # Shuffle is currently ignored in `DaskDataLoader`
        val_shuffle:bool=False, # Shuffle validation `DataLoader`
        n:int=None, # Size of `Datasets` used to create `DataLoader`
        path:str|Path='.', # Path to put in `DataLoaders`
        dl_type:DataLoader=None, # Type of `DataLoader`
        dl_kwargs:list=None, # List of kwargs to pass to individual `DataLoader`s
        device:torch.device=None, # Device to put `DataLoaders`
        drop_last:bool=None, # Drop last incomplete batch, defaults to `shuffle`. Currently ignored in `DaskDataLoader`
        val_bs:int=None, # Validation batch size, defaults to `bs`
        **kwargs
    ) -> DataLoaders:
        if shuffle_train is not None:
            shuffle=shuffle_train
            warnings.warn('`shuffle_train` is deprecated. Use `shuffle` instead.',DeprecationWarning)
        if device is None: device=default_device()
        if dl_kwargs is None: dl_kwargs = [{}] * self.n_subsets
        if dl_type is None: dl_type = self._dl_type
        # if drop_last is None: drop_last = shuffle
        if shuffle or drop_last:
            shuffle, drop_last = False, False
            warnings.warn('`shuffle` and `drop_last` are currently ignored.')
        val_kwargs={k[4:]:v for k,v in kwargs.items() if k.startswith('val_')}
        def_kwargs = {'bs':bs,'shuffle':shuffle,'drop_last':drop_last,'n':n,'device':device}
        dl = dl_type(self.subset(0), **merge(kwargs,def_kwargs, dl_kwargs[0]))
        def_kwargs = {'bs':bs if val_bs is None else val_bs,'shuffle':val_shuffle,'n':None,'drop_last':False}
        dls = [dl] + [dl.new(self.subset(i), **merge(kwargs,def_kwargs,val_kwargs,dl_kwargs[i]))
                      for i in range(1, self.n_subsets)]
        return self._dbunch_type(*dls, path=path, device=device)

properties(TabularDask,'iloc','targ','all_col_names','n_subsets','x_names','y', 'n_inp')

* `df`: A `DataFrame` of your data
* `cat_names`: Your categorical `x` variables
* `cont_names`: Your continuous `x` variables
* `y_names`: Your dependent `y` variables
  * Note: Mixed y's such as Regression and Classification is not currently supported, however multiple regression or classification outputs is
* `y_block`: How to sub-categorize the type of `y_names` (`CategoryBlock` or `RegressionBlock`)
* `train_mask_func`: A function that creates a train/validation mask over a `DataFrame`. See `get_random_train_mask` for an example.
* `do_setup`: A parameter for if `Tabular` will run the data through the `procs` upon initialization
* `device`: `cuda` or `cpu`

In [None]:
#|export
def _add_prop(cls, nm):
    @property
    def f(o): return o[list(getattr(o,nm+'_names'))]
    @f.setter
    def fset(o, v): o[getattr(o,nm+'_names')] = v
    setattr(cls, nm+'s', f)
    setattr(cls, nm+'s', fset)

_add_prop(TabularDask, 'cat')
_add_prop(TabularDask, 'cont')
_add_prop(TabularDask, 'y')
_add_prop(TabularDask, 'x')
_add_prop(TabularDask, 'all_col')

In [None]:
#|export
TabularDask.train, TabularDask.valid = add_props(lambda i,x: x.subset(i))

In [None]:
#|hide
df = pd.DataFrame({'a':[0,1,2,0,2], 'b':[0,0,0,0,1]})
ddf = dd.from_pandas(df)
to = TabularDask(ddf, cat_names='a')
t = pickle.loads(pickle.dumps(to))
test_eq(t.items,to.items)
test_eq(to.all_cols,to[['a']])

In [None]:
#|hide
import gc

In [None]:
#|hide
def _count_objs(o):
    "Counts number of instanes of class `o`"
    objs = gc.get_objects()
    return len([x for x in objs if isinstance(x, pd.DataFrame)])

df = pd.DataFrame({'a':[0,1,2,0,2], 'b':[0,0,0,0,1]})
df_b = pd.DataFrame({'a':[1,2,0,0,2], 'b':[1,0,3,0,1]})
ddf, ddf_b = dd.from_pandas(df), dd.from_pandas(df_b)

to = TabularPandas(df, cat_names='a', inplace=True)

_init_count = _count_objs(dd.DataFrame)
to_new = to.new(df_b, inplace=True)
test_eq(_init_count, _count_objs(dd.DataFrame))



## Transforms

These transforms inherit from `TabularProc` and are applied as soon as the data is available rather than as data is called from the `DataLoader`

In [None]:
#|export
class DaskCategoryMap(CategoryMap):
    "Dask implementation of CategoryMap. Collection of categories with the reverse mapping in `o2i`"
    def __init__(self, col, sort=True, add_na=False, strict=False):
        if hasattr(col, 'dtype') and isinstance(col.dtype, CategoricalDtype):
            items = L(col.cat.categories, use_list=True)
            #Remove non-used categories while keeping order
            if strict: items = L(o for o in items if o in col.unique())
        else:
            if not hasattr(col,'unique'): col = L(col, use_list=True)
            # `o==o` is the generalized definition of non-NaN used by Pandas
            items = col.unique()
            # Dask compatibility
            if hasattr(items, "compute"):
                items = items.compute()
                # Dask sometimes (always?) represents NANs as `pandas._libs.missing.NAType` values 
                # which do not work with the `o==o` condition (TypeError: boolean value of NA is ambiguous)
                items = items.dropna()
            items = L(o for o in items if o==o)
            if sort: items = items.sorted()
        self.items = '#na#' + items if add_na else items
        self.o2i = defaultdict(int, self.items.val2idx()) if add_na else dict(self.items.val2idx())

In [None]:
#|hide
a = DaskCategoryMap(pd.Series([9,1,2,7,3,7,1,9]))
a.items, a.o2i

((#5) [1,2,3,7,9], {1: 0, 2: 1, 3: 2, 7: 3, 9: 4})

In [None]:
#|hide
a = DaskCategoryMap(pd.Series([9,1,2,7,3,7,1,9]), sort=False)
a.items, a.o2i

((#5) [9,1,2,7,3], {9: 0, 1: 1, 2: 2, 7: 3, 3: 4})

In [None]:
#|hide
a = DaskCategoryMap([9,1,2,7,3,7,1,9], sort=False)
a.items, a.o2i

((#5) [9,1,2,7,3], {9: 0, 1: 1, 2: 2, 7: 3, 3: 4})

In [None]:
#|hide
a = DaskCategoryMap(dd.from_pandas(pd.Series([9,1,2,7,3,7,1,9])), sort=False)
a.items, a.o2i

((#5) [9,1,2,7,3], {9: 0, 1: 1, 2: 2, 7: 3, 3: 4})

In [None]:
#|export
class DaskCategorify(TabularProc):
    "Transform the categorical variables to something similar to `pd.Categorical`"
    order = 1
    def __init__(self, cat_vocabs:'dict | None'=None):
        classes = {}
        if cat_vocabs is not None:
            classes = {n: DaskCategoryMap(vocab, sort=False, add_na=False) for n, vocab in cat_vocabs.items()}
        store_attr(classes=classes, but='to')

    def setups(self, to):
        _cat_names = [n for n in to.cat_names if n not in self.classes]
        # Convert numeric categorical columns to strings
        _num_cats = list(to.items[_cat_names].select_dtypes(include=['number', 'bool']).columns)
        to.items[_num_cats] = to.items[_num_cats].astype('category')
        _cats = getattr(to, 'train', to).items[_cat_names].categorize()
        for n in _cat_names:
            self.classes[n] = DaskCategoryMap(_cats[n], add_na=(n in to.cat_names))

    def encodes(self, to): to.transform(list(self.classes.keys()), partial(_apply_cats, voc=self.classes, add=1))
    def decodes(self, to): to.transform(list(self.classes.keys()), partial(_decode_cats, voc=self.classes))
    def __getitem__(self,k): return self.classes[k]

In [None]:
#|hide
a = DaskCategorify({'a': pd.Series([9,1,2,7,3,7,1,9])})
a.classes

{'a': [9, 1, 2, 7, 3]}

In [None]:
#|hide
a = DaskCategorify({'a': [9,1,2,7,3,7,1,9]})
a.classes

{'a': [9, 1, 2, 7, 3]}

In [None]:
#|hide
a = DaskCategorify({'a': dd.from_pandas(pd.Series([9,1,2,7,3,7,1,9]))})
a.classes

{'a': [9, 1, 2, 7, 3]}

In [None]:
#|hide
a = DaskCategorify({'a': [9,1,2,7,3,7,1,9], 'b': [0,1,2]})
a.classes

{'a': [9, 1, 2, 7, 3], 'b': [0, 1, 2]}

In [None]:
#|export
def _apply_cats (c, voc, add):
    if not (hasattr(c, 'dtype') and isinstance(c.dtype, CategoricalDtype)):
        return pd.Categorical(c, categories=voc[c.name][add:]).codes+add
    return c.cat.codes+add #if is_categorical_dtype(c) else c.map(voc[c.name].o2i)
def _decode_cats(c, voc): return c.map(dict(enumerate(voc[c.name].items)))

In [None]:
show_doc(DaskCategorify, title_level=3)

---

[source](https://github.com/stefan027/bigtabular/blob/main/bigtabular/core.py#L248){target="_blank" style="float:right; font-size:smaller"}

### DaskCategorify

>      DaskCategorify (cat_vocabs:"'dict|None'"=None)

*Transform the categorical variables to something similar to `pd.Categorical`*

The `Categorify` class from `fastai.tabular.core.Categorify` is modified to:
  - be compatible with Dask
  - accept existing vocabs through the `cat_vocabs` input

While visually in the `DataFrame` you will not see a change, the classes are stored in `to.procs.categorify` as we can see below on a dummy `DataFrame`:

In [None]:
ddf = dd.from_pandas(pd.DataFrame({'a':[0,1,2,0,2]}))
to = TabularDask(ddf, DaskCategorify, 'a')
to.show()

Unnamed: 0,a
0,0
1,1
2,2
3,0
4,2


Each column's unique values are stored in a dictionary of `column:[values]`:

In [None]:
cat = to.procs.dask_categorify
cat.classes

{'a': ['#na#', 0, 1, 2]}

In [None]:
#|hide
def test_series(a,b): return test_eq(list(a), b)
test_series(cat['a'], ['#na#',0,1,2])
test_series(to['a'], [1,2,3,1,3])

In [None]:
#|hide
ddf1 = dd.from_pandas(pd.DataFrame({'a':[1,0,3,-1,2]}))
to1 = to.new(ddf1)
to1.process()
#Values that weren't in the training df are sent to 0 (na)
test_series(to1['a'], [2,1,0,0,3])
to2 = cat.decode(to1)
test_series(to2['a'], [1,0,'#na#','#na#',2])

We can provide an exisiting vocab if it exists, for example if pretrained weights will be used for a categorical variable:

In [None]:
ddf = dd.from_pandas(pd.DataFrame({'a':['Cat','Dog','Lion','Leopard','Honey badger']}))

With default vocab:

In [None]:
to = TabularDask(ddf, DaskCategorify, 'a')
cat = to.procs.dask_categorify
cat.classes

{'a': ['#na#', 'Cat', 'Dog', 'Honey badger', 'Leopard', 'Lion']}

With predefined vocab:

In [None]:
vocab = {'a': ['Honey badger', 'Dog', 'Cat', 'Lion','Leopard']}
to = TabularDask(ddf, DaskCategorify(cat_vocabs=vocab), 'a')
cat = to.procs.dask_categorify
cat.classes

{'a': ['Honey badger', 'Dog', 'Cat', 'Lion', 'Leopard']}

In [None]:
#|hide
#test with splits
cat = DaskCategorify()
ddf = dd.from_pandas(pd.DataFrame({'a':[0,1,2,3,2]}))
to = TabularDask(ddf, cat, 'a', train_mask_func=lambda df: df['a'] <=2)
test_series(cat['a'], ['#na#',0,1,2])
test_series(to['a'], [1,2,3,0,3])

In [None]:
#|hide
ddf = dd.from_pandas(pd.DataFrame({'a':pd.Categorical(['M','H','L','M'], categories=['H','M','L'], ordered=True)}))
to = TabularDask(ddf, DaskCategorify, 'a')
cat = to.procs.dask_categorify
test_series(cat['a'], ['#na#','H','M','L'])
test_series(to.items.a, [2,1,3,2])
to2 = cat.decode(to)
test_series(to2['a'], ['M','H','L','M'])

In [None]:
#|export
class DaskNormalize(TabularProc):
    parameters,order = L('mean', 'std'),99
    def __init__(self, cols=None):
        self.cols = listify(cols)

    def setups(self, to):
        if not self.cols: self.cols = listify(to.cont_names)
        # store_attr(but='to', means=getattr(to, 'train', to).conts.mean().compute(),
        #            stds=getattr(to, 'train', to).conts.std(ddof=0).compute()+1e-7)
        store_attr(but='to', means=getattr(to, 'train', to).items[self.cols].mean().compute(),
                   stds=getattr(to, 'train', to).items[self.cols].std(ddof=0).compute())
        return self(to)

    def encodes(self, to):
        # to.conts = to.conts.map_partitions(lambda df: (df-self.means) / self.stds)
        to.items[self.cols] = to.items[self.cols].map_partitions(lambda df: (df-self.means) / self.stds)
        return to

    def decodes(self, to):
        # to.conts = to.conts.map_partitions(lambda df: (df*self.stds) + self.means)
        to.items[self.cols] = to.items[self.cols].map_partitions(lambda df: (df*self.stds) + self.means)
        return to

In [None]:
#|export
class DaskCategorize(DisplayedTransform):
    loss_func,order=CrossEntropyLossFlat(),1
    def __init__(self, vocab=None, sort=True, add_na=False):
        self.vocab = vocab
        if vocab is not None: self.vocab = DaskCategoryMap(vocab, sort=sort, add_na=add_na)

    def setups(self, to):
        if len(to.y_names) > 0:
            if self.vocab is None:
                self.vocab = DaskCategoryMap(getattr(to, 'train', to).iloc[:,to.y_names[0]], strict=True)
            else:
                self.vocab = DaskCategoryMap(self.vocab, sort=False, add_na=self.add_na)
            self.c = len(self.vocab)
        return self(to)

    def encodes(self, to):
        to.transform(to.y_names, partial(_apply_cats, voc={n: self.vocab for n in to.y_names}, add=0), all_col=False)
        return to

    def decodes(self, to):
        to.transform(to.y_names, partial(_decode_cats, voc={n: self.vocab for n in to.y_names}), all_col=False)
        return to

In [None]:
#|export
class DaskFillStrategy:
    "Namespace containing the various filling strategies."
    def median  (c,fill): return c.median_approximate().compute()
    # def constant(c,fill): return fill
    def constant(c,fill): return {n: fill[n] for n in c.columns}
    # def mode    (c,fill): return c.dropna().value_counts().idxmax().compute()
    def mode    (c,fill): return {n: c[n].dropna().value_counts().idxmax().compute() for n in c.columns}

Currently, filling with the `median`, a `constant`, and the `mode` are supported.

In [None]:
#|export
class DaskFillMissing(TabularProc):
    "Fill the missing values in continuous columns."
    def __init__(self, fill_strategy=DaskFillStrategy.median, add_col=True, fill_vals=None):
        if fill_vals is None: fill_vals = defaultdict(int)
        store_attr()

    def setups(self, to):
        missing = to.conts.isnull().any().compute()
        missing_cols = list(missing[missing].keys())
        # store_attr(but='to', na_dict={n:self.fill_strategy(to[n], self.fill_vals[n])
        #                     for n in missing[missing].keys()})
        store_attr(but='to', na_dict=dict(self.fill_strategy(to[missing_cols], self.fill_vals)))
        self.fill_strategy = self.fill_strategy.__name__

    def encodes(self, to):
        missing = to.conts.isnull()
        missing_any = missing.any().compute()
        for n in missing_any[missing_any].keys():
            assert n in self.na_dict, f"nan values in `{n}` but not in setup training set"
        if self.na_dict:
            to.items = to.items.fillna(self.na_dict)
            if self.add_col:
                for n in self.na_dict.keys():
                    to.items[n+'_na'] = missing[n]
                    if n+'_na' not in to.cat_names: to.cat_names.append(n+'_na')

In [None]:
show_doc(DaskFillMissing, title_level=3)

---

[source](https://github.com/stefan027/bigtabular/blob/main/bigtabular/core.py#L335){target="_blank" style="float:right; font-size:smaller"}

### DaskFillMissing

>      DaskFillMissing (fill_strategy=<function median>, add_col=True,
>                       fill_vals=None)

*Fill the missing values in continuous columns.*

In [None]:
#|export
class DaskRegressionSetup(DisplayedTransform):
    "A Dask-compatible transform that floatifies targets"
    loss_func=MSELossFlat()
    def __init__(self, c=None): store_attr()

    def setups(self, to):
        if self.c is not None: return
        self.c = len(to.y_names)
        return self(to)

    def encodes(self, to):
        for c in to.y_names:
            if c in to.items.columns: to[c] = to[c].astype("float")
        return to
    def decodes(self, to): return to

We define basic `TransformBlock`s that are compatible with the Dask transforms:

In [None]:
#|export
def DaskCategoryBlock(
    vocab:MutableSequence|pd.Series=None, # List of unique class names
    sort:bool=True, # Sort the classes alphabetically
    add_na:bool=False, # Add `#na#` to `vocab`
):
    "A Dask-compatible `TransformBlock` for single-label categorical targets"
    return TransformBlock(type_tfms=DaskCategorize(vocab=vocab, sort=sort, add_na=add_na))

In [None]:
#|export
def DaskRegressionBlock(
    n_out:int=None, # Number of output values
):
    "A Dask-compatible `TransformBlock` for float targets"
    return TransformBlock(type_tfms=DaskRegressionSetup(c=n_out))

In [None]:
#|hide
#test with targets
cat = DaskCategorify()
ddf = dd.from_pandas(pd.DataFrame({'a':[0,1,2,3,2], 'b': ['a', 'b', 'a', 'b', 'b']}))
def split_func(df): return df['a'] <=2
to = TabularDask(ddf, cat, 'a', train_mask_func=split_func, y_names='b')
test_series(to.vocab, ['a', 'b'])
test_series(to['b'], [0,1,0,1,1])
to2 = to.procs.decode(to)
test_series(to2['b'], ['a', 'b', 'a', 'b', 'b'])

In [None]:
#|hide
cat = DaskCategorify()
ddf = dd.from_pandas(pd.DataFrame({'a':[0,1,2,3,2], 'b': ['a', 'b', 'a', 'b', 'b']}))
def split_func(df): return df['a'] <=2
to = TabularDask(ddf, cat, 'a', train_mask_func=split_func, y_names='b')
test_series(to.vocab, ['a', 'b'])
test_series(to['b'], [0,1,0,1,1])
to2 = to.procs.decode(to)
test_series(to2['b'], ['a', 'b', 'a', 'b', 'b'])

In [None]:
#|hide
#test with targets and train
cat = DaskCategorify()
ddf = dd.from_pandas(pd.DataFrame({'a':[0,1,2,3,2], 'b': ['a', 'b', 'a', 'c', 'b']}))
def split_func(df): return df['a'] <=2
to = TabularDask(ddf, cat, 'a', train_mask_func=split_func, y_names='b')
test_series(to.vocab, ['a', 'b'])

In [None]:
#|hide
#test to ensure no copies of the dataframe are stored
cat = DaskCategorify()
ddf = dd.from_pandas(pd.DataFrame({'a':[0,1,2,3,4]}))
def split_func(df): return df['a'] <=2
to = TabularDask(ddf, cat, cont_names='a', train_mask_func=split_func)
test_eq(hasattr(to.procs.dask_categorify, 'to'), False)

In [None]:
#|hide
norm = DaskNormalize()
ddf = dd.from_pandas(pd.DataFrame({'a':[0,1,2,3,4]}))
to = TabularDask(ddf, norm, cont_names='a')
x = np.array([0,1,2,3,4])
m,s = x.mean(),x.std()
test_eq(norm.means['a'], m)
test_close(norm.stds['a'], s)
test_close(to['a'].compute().values, (x-m)/s)

In [None]:
#|hide
ddf1 = dd.from_pandas(pd.DataFrame({'a':[5,6,7]}))
to1 = to.new(ddf1)
to1.process()
test_close(to1['a'].compute().values, (np.array([5,6,7])-m)/s)
to2 = norm.decode(to1)
test_close(to2['a'].compute().values, [5,6,7])

In [None]:
#|hide
norm = DaskNormalize()
ddf = dd.from_pandas(pd.DataFrame({'a':[0,1,2,3,4]}))
to = TabularDask(ddf, norm, cont_names='a', train_mask_func=lambda df: df['a'] <=2)
x = np.array([0,1,2])
m,s = x.mean(),x.std()
test_eq(norm.means['a'], m)
test_close(norm.stds['a'], s)
test_close(to['a'].compute().values, (np.array([0,1,2,3,4])-m)/s)

In [None]:
#|hide
norm = DaskNormalize()
ddf = dd.from_pandas(pd.DataFrame({'a':[0,1,2,3,4]}))
to = TabularDask(ddf, norm, cont_names='a', train_mask_func=lambda df: df['a'] <=2)
test_eq(hasattr(to.procs.dask_normalize, 'to'), False)

In [None]:
#|hide
fill1,fill2,fill3 = (DaskFillMissing(fill_strategy=s)
                     for s in [DaskFillStrategy.median, DaskFillStrategy.constant, DaskFillStrategy.mode])
ddf = dd.from_pandas(pd.DataFrame({'a':[0,1,np.nan,1,2,3,4]}))
ddf1 = ddf.copy(); ddf2 = ddf.copy()
tos = (TabularDask(ddf, fill1, cont_names='a'),
       TabularDask(ddf1, fill2, cont_names='a'),
       TabularDask(ddf2, fill3, cont_names='a'))
test_eq(fill1.na_dict, {'a': 1.5})
test_eq(fill2.na_dict, {'a': 0})
test_eq(fill3.na_dict, {'a': 1.0})

for t in tos: test_eq(t.cat_names, ['a_na'])

for to_,v in zip(tos, [1.5, 0., 1.]):
    test_eq(to_.items.compute()['a'].values, np.array([0, 1, v, 1, 2, 3, 4]))
    test_eq(to_.items.compute()['a_na'].values, np.array([0, 0, 1, 0, 0, 0, 0]))

In [None]:
#|hide
fill = DaskFillMissing()
ddf = dd.from_pandas(pd.DataFrame({'a':[0,1,np.nan,1,2,3,4], 'b': [0,1,2,3,4,5,6]}))
to = TabularDask(ddf, fill, cont_names=['a', 'b'])
test_eq(fill.na_dict, {'a': 1.5})
test_eq(to.cat_names, ['a_na'])

test_eq(to.items.compute()['a'].values, np.array([0, 1, 1.5, 1, 2, 3, 4]))
test_eq(to.items.compute()['a_na'].values, np.array([0, 0, 1, 0, 0, 0, 0]))
test_eq(to.items.compute()['b'].values, np.array([0,1,2,3,4,5,6]))

In [None]:
#|hide
fill = DaskFillMissing()
ddf = dd.from_pandas(pd.DataFrame({'a':[0,1,np.nan,1,2,3,4], 'b': [0,1,2,3,4,5,6]}))
to = TabularDask(ddf, fill, cont_names=['a', 'b'])
test_eq(hasattr(to.procs.dask_fill_missing, 'to'), False)

## TabularDask Pipelines -

In [None]:
#|hide
procs = [DaskNormalize, DaskCategorify, DaskFillMissing, noop]
ddf = dd.from_pandas(pd.DataFrame({'a':[0,1,2,1,1,2,0], 'b':[0,1,np.nan,1,2,3,4]}))
to = TabularDask(ddf, procs, cat_names='a', cont_names='b')

#Test setup and apply on df_main
test_series(to.cat_names, ['a', 'b_na'])
test_series(to.items.compute()['a'], [1,2,3,2,2,3,1])
test_series(to.items.compute()['b_na'], [1,1,2,1,1,1,1])
x = np.array([0,1,1.5,1,2,3,4])
m,s = x.mean(),x.std()
test_close(to.items.compute()['b'].values, (x-m)/s)
test_eq(to.classes, {'a': ['#na#',0,1,2], 'b_na': ['#na#',False,True]})

In [None]:
#|hide
#Test apply on y_names
ddf = dd.from_pandas(pd.DataFrame({'a':[0,1,2,1,1,2,0], 'b':[0,1,np.nan,1,2,3,4], 'c': ['b','a','b','a','a','b','a']}))
to = TabularDask(ddf, procs, 'a', 'b', y_names='c')

test_series(to.cat_names, ['a', 'b_na'])
test_series(to.items.compute()['a'], [1,2,3,2,2,3,1])
test_series(to.items.compute()['b_na'], [1,1,2,1,1,1,1])
test_series(to.items.compute()['c'], [1,0,1,0,0,1,0])
x = np.array([0,1,1.5,1,2,3,4])
m,s = x.mean(),x.std()
test_close(to.items.compute()['b'].values, (x-m)/s)
test_eq(to.classes, {'a': ['#na#',0,1,2], 'b_na': ['#na#',False,True]})
test_eq(to.vocab, ['a','b'])

In [None]:
#|hide
ddf = dd.from_pandas(pd.DataFrame({'a':[0,1,2,1,1,2,0], 'b':[0,1,np.nan,1,2,3,4], 'c': ['b','a','b','a','a','b','a']}))
to = TabularDask(ddf, procs, 'a', 'b', y_names='c')

test_series(to.cat_names, ['a', 'b_na'])
test_series(to.items.compute()['a'], [1,2,3,2,2,3,1])
test_eq(ddf.a.dtype, np.int64 if sys.platform == "win32" else int)
test_series(to.items.compute()['b_na'], [1,1,2,1,1,1,1])
test_series(to.items.compute()['c'], [1,0,1,0,0,1,0])

In [None]:
#|hide
ddf = dd.from_pandas(pd.DataFrame({'a':[0,1,2,1,1,2,0], 'b':[0,np.nan,1,1,2,3,4], 'c': ['b','a','b','a','a','b','a']}))
def split_func(df): return pd.Series([True, True, False, False, True, False, True])
to = TabularDask(ddf, procs, cat_names='a', cont_names='b', y_names='c', train_mask_func=split_func)

test_series(to.cat_names, ['a', 'b_na'])
test_series(to.train.items.compute()['a'], [1,2,2,1])
test_eq(ddf.a.dtype, np.int64 if sys.platform == "win32" else int)
test_series(to.train.items.compute()['b_na'], [1,2,1,1])
test_series(to.train.items.compute()['c'], [1,0,0,0])

In [None]:
#|export
class DaskDataLoader(DataLoader):
    "Iterable dataloader for tabular learning with Dask"
    # TODO: align with TabDataLoader + ReadTabBatch (fastai.tabular.core)?
    def create_batch(self, b):
        b = list(map(np.array, zip(*b)))
        cats, conts = tensor(b[0]).long(), tensor(b[1]).float()
        res = (cats, conts)
        # add target if available
        if len(b) > 2: res = res + (tensor(b[2]),)
        return res

    def decode(self, b):
        tmp = self.dataset.new(dd.from_pandas(b))
        return tmp.decode().items.compute().drop(columns="_int_train_mask")

    def show_batch(self,
        b=None, # Batch to show
        max_n:int=9, # Maximum number of items to show,
        show:bool=True, # Whether to display data
    ):
        "Show `max_n` input(s) and target(s) from the batch."
        if b is None: b = self.one_batch()
        x1 = pd.DataFrame(b[0][:max_n].cpu().numpy(), columns=self.dataset.cat_names)
        x2 = pd.DataFrame(b[1][:max_n].cpu().numpy(), columns=self.dataset.cont_names)
        b_ = [x1, x2]
        if len(b) > 2:
            y = pd.DataFrame(b[2][:max_n].cpu().numpy(), columns=self.dataset.y_names)
            b_.append(y)
        b = pd.concat(b_, axis=1)
        if not show: return b
        b = self.decode(b)
        display_df(b)

    def show_results(self, 
        b, # Batch to show results for
        out, # Predicted output from model for the batch
        max_n:int=9, # Maximum number of items to show
        ctxs=None, # List of `ctx` objects to show data. Could be matplotlib axis, DataFrame etc
        show:bool=True, # Whether to display data
        **kwargs
    ):
        "Show `max_n` results with input(s), target(s) and prediction(s)."
        df = self.show_batch(b, max_n=max_n, show=False)
        yhat = pd.DataFrame(out[:max_n].cpu().numpy(), columns=[n+'_pred' for n in self.dataset.y_names])
        if show: display_df(pd.concat([df, yhat], axis=1))

TabularDask._dl_type = DaskDataLoader
TabularDask._dbunch_type = DataLoaders

## Integration example

For a more in-depth explanation, see the [BigTabular tutorial](tutorial.html)

In [None]:
path = untar_data(URLs.ADULT_SAMPLE)
df = pd.read_csv(path/'adult.csv')
df_main,df_test = df.iloc[:10000].copy(),df.iloc[10000:].copy()
df_test.drop('salary', axis=1, inplace=True)
ddf_main, ddf_test = dd.from_pandas(df_main), dd.from_pandas(df_test)
ddf_main.head()

Unnamed: 0,age,workclass,fnlwgt,education,education-num,marital-status,occupation,relationship,race,sex,capital-gain,capital-loss,hours-per-week,native-country,salary
0,49,Private,101320,Assoc-acdm,12.0,Married-civ-spouse,,Wife,White,Female,0,1902,40,United-States,>=50k
1,44,Private,236746,Masters,14.0,Divorced,Exec-managerial,Not-in-family,White,Male,10520,0,45,United-States,>=50k
2,38,Private,96185,HS-grad,,Divorced,,Unmarried,Black,Female,0,0,32,United-States,<50k
3,38,Self-emp-inc,112847,Prof-school,15.0,Married-civ-spouse,Prof-specialty,Husband,Asian-Pac-Islander,Male,0,0,40,United-States,>=50k
4,42,Self-emp-not-inc,82297,7th-8th,,Married-civ-spouse,Other-service,Wife,Black,Female,0,0,50,United-States,<50k


In [None]:
cat_names = ['workclass', 'education', 'marital-status', 'occupation', 'relationship', 'race']
cont_names = ['age', 'fnlwgt', 'education-num']
procs = [DaskCategorify, DaskFillMissing, DaskNormalize]

In [None]:
to = TabularDask(
    ddf_main, procs, cat_names, cont_names, y_names="salary", train_mask_func=get_random_train_mask
)

In [None]:
dls = to.dataloaders()
dls.valid.show_batch()



Unnamed: 0,workclass,education,marital-status,occupation,relationship,race,education-num_na,age,fnlwgt,education-num,salary
0,Private,HS-grad,Divorced,#na#,Unmarried,Black,True,38.0,96185.000419,10.0,<50k
1,Private,HS-grad,Widowed,#na#,Unmarried,White,False,51.0,284329.003227,9.0,<50k
2,Private,Bachelors,Married-civ-spouse,Prof-specialty,Husband,White,True,45.0,267966.997633,10.0,>=50k
3,Self-emp-not-inc,Some-college,Divorced,#na#,Unmarried,White,True,47.0,213745.000003,10.0,<50k
4,Private,Bachelors,Never-married,Adm-clerical,Not-in-family,Asian-Pac-Islander,False,23.999999,162593.001373,13.0,<50k
5,Local-gov,Masters,Divorced,Exec-managerial,Unmarried,White,True,46.0,124071.002166,10.0,>=50k
6,Local-gov,Masters,Widowed,Prof-specialty,Unmarried,White,False,49.0,78858.996802,14.0,<50k
7,Self-emp-not-inc,Assoc-acdm,Married-civ-spouse,#na#,Husband,White,True,38.0,133298.99901,10.0,<50k
8,Self-emp-inc,Some-college,Married-civ-spouse,Sales,Husband,White,False,54.000001,206963.999516,10.0,>=50k


In [None]:
to.show()

Unnamed: 0,workclass,education,marital-status,occupation,relationship,race,education-num_na,age,fnlwgt,education-num,salary
0,Private,Assoc-acdm,Married-civ-spouse,#na#,Wife,White,False,49.0,101320.0,12.0,>=50k
1,Private,Masters,Divorced,Exec-managerial,Not-in-family,White,False,44.0,236746.0,14.0,>=50k
2,Private,HS-grad,Divorced,#na#,Unmarried,Black,True,38.0,96185.0,10.0,<50k
3,Self-emp-inc,Prof-school,Married-civ-spouse,Prof-specialty,Husband,Asian-Pac-Islander,False,38.0,112847.0,15.0,>=50k
4,Self-emp-not-inc,7th-8th,Married-civ-spouse,Other-service,Wife,Black,True,42.0,82297.0,10.0,<50k
5,Private,HS-grad,Never-married,Handlers-cleaners,Own-child,White,False,20.0,63210.0,9.0,<50k
6,Private,Some-college,Divorced,#na#,Other-relative,White,False,49.0,44434.0,10.0,<50k
7,Private,11th,Married-civ-spouse,#na#,Husband,White,False,37.0,138940.0,7.0,<50k
8,Private,HS-grad,Married-civ-spouse,Craft-repair,Husband,White,False,46.0,328216.0,9.0,>=50k
9,Self-emp-inc,HS-grad,Married-civ-spouse,#na#,Husband,White,True,36.0,216711.0,10.0,>=50k


We can decode any set of transformed data by calling `to.decode_row` with our raw data:

In [None]:
row = to.items.head().iloc[0]
to.decode_row(row)

age                                49.0
workclass                       Private
fnlwgt                    101320.000758
education                    Assoc-acdm
education-num                      12.0
marital-status       Married-civ-spouse
occupation                         #na#
relationship                       Wife
race                              White
sex                              Female
capital-gain                          0
capital-loss                       1902
hours-per-week                       40
native-country            United-States
salary                              NaN
_int_train_mask                    True
education-num_na                  False
Name: 0, dtype: object

We can make new test datasets based on the training data with the `to.new()`

:::{.callout-note}

Since machine learning models can't magically understand categories it was never trained on, the data should reflect this. If there are different missing values in your test data you should address this before training

:::

In [None]:
to_tst = to.new(ddf_test)
to_tst.process()
to_tst.items.head()

Unnamed: 0,age,workclass,fnlwgt,education,education-num,marital-status,occupation,relationship,race,sex,capital-gain,capital-loss,hours-per-week,native-country,_int_train_mask,education-num_na
10000,0.459802,5,1.338499,10,1.167323,3,2,1,2,Male,0,0,40,Philippines,True,1
10001,-0.935224,5,1.251829,12,-0.426158,3,15,1,4,Male,0,0,40,United-States,True,1
10002,1.047181,5,0.153621,2,-1.222898,1,9,2,5,Female,0,0,37,United-States,True,1
10003,0.533224,5,-0.277728,12,-0.426158,7,2,5,5,Female,0,0,43,United-States,True,1
10004,0.753491,6,1.440618,9,0.370583,3,5,1,5,Male,0,0,60,United-States,True,1


We can then convert it to a `DataLoader`:

In [None]:
tst_dl = dls.valid.new(to_tst)
tst_dl.show_batch()

Unnamed: 0,workclass,education,marital-status,occupation,relationship,race,education-num_na,age,fnlwgt,education-num
0,Private,Bachelors,Married-civ-spouse,Adm-clerical,Husband,Asian-Pac-Islander,False,45.0,338105.004776,13.0
1,Private,HS-grad,Married-civ-spouse,Transport-moving,Husband,Other,False,26.0,328662.996825,9.0
2,Private,11th,Divorced,Other-service,Not-in-family,White,False,53.000001,209021.999291,7.0
3,Private,HS-grad,Widowed,Adm-clerical,Unmarried,White,False,46.0,162029.999826,9.0
4,Self-emp-inc,Assoc-voc,Married-civ-spouse,Exec-managerial,Husband,White,False,49.0,349229.997941,11.0
5,Local-gov,Some-college,Married-civ-spouse,Exec-managerial,Husband,White,False,34.0,124827.002017,10.0
6,Self-emp-inc,Some-college,Married-civ-spouse,Sales,Husband,White,False,53.000001,290639.999727,10.0
7,Private,Some-college,Never-married,Sales,Own-child,White,False,19.0,106273.002695,10.0
8,Private,Some-college,Married-civ-spouse,Protective-serv,Husband,Black,False,72.0,53683.99721,10.0


## Other target types

### Multi-label categories

#### one-hot encoded label

In [None]:
def _mock_multi_label(df):
    sal,sex,white = [],[],[]
    for row in df.itertuples():
        sal.append(row.salary == '>=50k')
        sex.append(row.sex == ' Male')
        white.append(row.race == ' White')
    df['salary'] = np.array(sal)
    df['male']   = np.array(sex)
    df['white']  = np.array(white)
    return df

In [None]:
path = untar_data(URLs.ADULT_SAMPLE)
df = pd.read_csv(path/'adult.csv')
df_main,df_test = df.iloc[:10000].copy(),df.iloc[10000:].copy()
df_main = _mock_multi_label(df_main)
ddf_main, ddf_test = dd.from_pandas(df_main), dd.from_pandas(df_test)

In [None]:
ddf_main.head()

Unnamed: 0,age,workclass,fnlwgt,education,education-num,marital-status,occupation,relationship,race,sex,capital-gain,capital-loss,hours-per-week,native-country,salary,male,white
0,49,Private,101320,Assoc-acdm,12.0,Married-civ-spouse,,Wife,White,Female,0,1902,40,United-States,True,False,True
1,44,Private,236746,Masters,14.0,Divorced,Exec-managerial,Not-in-family,White,Male,10520,0,45,United-States,True,True,True
2,38,Private,96185,HS-grad,,Divorced,,Unmarried,Black,Female,0,0,32,United-States,False,False,False
3,38,Self-emp-inc,112847,Prof-school,15.0,Married-civ-spouse,Prof-specialty,Husband,Asian-Pac-Islander,Male,0,0,40,United-States,True,True,False
4,42,Self-emp-not-inc,82297,7th-8th,,Married-civ-spouse,Other-service,Wife,Black,Female,0,0,50,United-States,False,False,False


In [None]:
#|exporti
@EncodedMultiCategorize
def setups(self, to:Tabular):
    self.c = len(self.vocab)
    return self(to)

@EncodedMultiCategorize
def encodes(self, to:TabularDask): return to

@EncodedMultiCategorize
def decodes(self, to:TabularDask):
    to.transform(to.y_names, lambda c: c==1)
    return to

In [None]:
cat_names = ['workclass', 'education', 'marital-status', 'occupation', 'relationship', 'race']
cont_names = ['age', 'fnlwgt', 'education-num']
procs = [DaskCategorify, DaskFillMissing, DaskNormalize]
y_names=["salary", "male", "white"]

In [None]:
%%time 
to = TabularDask(
    ddf_main, procs, cat_names, cont_names, y_names=y_names, y_block=MultiCategoryBlock(encoded=True, vocab=y_names),
    train_mask_func=get_random_train_mask
)

CPU times: user 976 ms, sys: 3.9 ms, total: 980 ms
Wall time: 978 ms


In [None]:
dls = to.dataloaders()
dls.valid.show_batch()



Unnamed: 0,workclass,education,marital-status,occupation,relationship,race,education-num_na,age,fnlwgt,education-num,salary,male,white
0,Private,Assoc-acdm,Married-civ-spouse,#na#,Wife,White,False,49.0,101320.001538,12.0,True,False,True
1,Private,HS-grad,Divorced,#na#,Unmarried,Black,True,38.0,96185.002688,10.0,False,False,False
2,Self-emp-not-inc,7th-8th,Married-civ-spouse,Other-service,Wife,Black,True,42.0,82296.993602,10.0,False,False,False
3,Private,HS-grad,Married-civ-spouse,Craft-repair,Husband,White,False,46.0,328215.996282,9.0,True,True,True
4,Self-emp-inc,HS-grad,Married-civ-spouse,#na#,Husband,White,True,36.0,216710.999942,10.0,True,True,True
5,Private,11th,Never-married,Adm-clerical,Own-child,White,True,18.0,216283.999891,10.0,False,False,True
6,Private,Masters,Never-married,#na#,Not-in-family,White,False,35.0,261293.002427,14.0,False,True,True
7,State-gov,Masters,Divorced,#na#,Not-in-family,White,False,56.0,274110.999163,14.0,False,True,True
8,Private,9th,Divorced,Sales,Not-in-family,White,True,46.0,117604.998708,10.0,False,True,True


In [None]:
#|hide
#### Not one-hot encoded

In [None]:
#|hide
def _mock_multi_label(df):
    targ = []
    for row in df.itertuples():
        labels = []
        if row.salary == '>=50k': labels.append('>50k')
        if row.sex == ' Male':   labels.append('male')
        if row.race == ' White': labels.append('white')
        targ.append(' '.join(labels))
    df['target'] = np.array(targ)
    return df

In [None]:
#|hide
path = untar_data(URLs.ADULT_SAMPLE)
df = pd.read_csv(path/'adult.csv')
df_main,df_test = df.iloc[:10000].copy(),df.iloc[10000:].copy()
df_main = _mock_multi_label(df_main)
ddf_main, ddf_test = dd.from_pandas(df_main), dd.from_pandas(df_test)

In [None]:
#|hide
ddf_main.head()

Unnamed: 0,age,workclass,fnlwgt,education,education-num,marital-status,occupation,relationship,race,sex,capital-gain,capital-loss,hours-per-week,native-country,salary,target
0,49,Private,101320,Assoc-acdm,12.0,Married-civ-spouse,,Wife,White,Female,0,1902,40,United-States,>=50k,>50k white
1,44,Private,236746,Masters,14.0,Divorced,Exec-managerial,Not-in-family,White,Male,10520,0,45,United-States,>=50k,>50k male white
2,38,Private,96185,HS-grad,,Divorced,,Unmarried,Black,Female,0,0,32,United-States,<50k,
3,38,Self-emp-inc,112847,Prof-school,15.0,Married-civ-spouse,Prof-specialty,Husband,Asian-Pac-Islander,Male,0,0,40,United-States,>=50k,>50k male
4,42,Self-emp-not-inc,82297,7th-8th,,Married-civ-spouse,Other-service,Wife,Black,Female,0,0,50,United-States,<50k,


In [None]:
#|hide
# @MultiCategorize
# def encodes(self, to:Tabular):
#     #to.transform(to.y_names, partial(_apply_cats, {n: self.vocab for n in to.y_names}, 0))
#     return to

# @MultiCategorize
# def decodes(self, to:Tabular):
#     #to.transform(to.y_names, partial(_decode_cats, {n: self.vocab for n in to.y_names}))
#     return to

In [None]:
#|hide
# cat_names = ['workclass', 'education', 'marital-status', 'occupation', 'relationship', 'race']
# cont_names = ['age', 'fnlwgt', 'education-num']
# procs = [Categorify, FillMissing, Normalize]
# splits = RandomSplitter()(range_of(df_main))

In [None]:
#|hide
# %time to = TabularPandas(df_main, procs, cat_names, cont_names, y_names="target", y_block=MultiCategoryBlock(), splits=splits)

In [None]:
#|hide
# to.procs[2].vocab

### Regression

In [None]:
path = untar_data(URLs.ADULT_SAMPLE)
df = pd.read_csv(path/'adult.csv')
df_main,df_test = df.iloc[:10000].copy(),df.iloc[10000:].copy()
df_main = _mock_multi_label(df_main)
ddf_main, ddf_test = dd.from_pandas(df_main), dd.from_pandas(df_test)

In [None]:
cat_names = ['workclass', 'education', 'marital-status', 'occupation', 'relationship', 'race']
cont_names = ['fnlwgt', 'education-num']
procs = [DaskCategorify, DaskFillMissing, DaskNormalize]

In [None]:
%%time
to = TabularDask(ddf_main, procs, cat_names, cont_names, y_names='age', train_mask_func=get_random_train_mask)

CPU times: user 802 ms, sys: 0 ns, total: 802 ms
Wall time: 802 ms


In [None]:
to.procs[-1].means

fnlwgt           192305.234512
education-num        10.090168
dtype: float64

In [None]:
dls = to.dataloaders()
dls.valid.show_batch()



Unnamed: 0,workclass,education,marital-status,occupation,relationship,race,education-num_na,fnlwgt,education-num,age
0,Private,HS-grad,Divorced,#na#,Unmarried,Black,True,96185.001708,10.0,38.0
1,Private,11th,Married-civ-spouse,#na#,Husband,White,False,138940.000061,7.0,37.0
2,Private,Assoc-voc,Married-civ-spouse,Sales,Husband,White,True,84661.00459,10.0,43.0
3,Private,HS-grad,Widowed,#na#,Unmarried,White,False,284328.998764,9.0,51.0
4,Private,Masters,Never-married,#na#,Not-in-family,White,False,261292.999689,14.0,35.0
5,Private,Some-college,Married-civ-spouse,#na#,Wife,Black,True,188942.000047,10.0,40.0
6,Private,HS-grad,Married-civ-spouse,Craft-repair,Husband,White,False,247294.003159,9.0,49.0
7,Self-emp-inc,Masters,Married-civ-spouse,Exec-managerial,Husband,White,False,222614.998466,14.0,55.0
8,Private,Assoc-acdm,Never-married,Adm-clerical,Not-in-family,White,True,353824.002456,10.0,46.0


## Export -

In [None]:
#|hide
from nbdev import nbdev_export
nbdev_export()