In [1]:
%%writefile pipetorch.py

import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler, PolynomialFeatures
import copy

Overwriting pipetorch.py


In [2]:
%%writefile -a pipetorch.py

def to_numpy(X):
    try:
        return arr.data.cpu().numpy()
    except: pass
    return arr

Appending to pipetorch.py


In [3]:
%%writefile -a pipetorch.py

class PTDataFrame(pd.DataFrame):
    _metadata = ['_pt_scaler', '_pt_indices', '_train_indices', '_valid_indices', '_test_indices']

    @classmethod
    def read_csv(cls, *args, sep=',', **kwargs):
        df = pd.read_csv(*args, sep=sep, **kwargs)
        return cls(df)

    @property
    def _constructor(self):
        return PTDataFrame

    def _copy_meta(self, df):
        for attr in self._constructor._metadata:
            if hasattr(self, attr):
                df.__dict__[attr] = getattr(self, attr)
        return df

    def _check_list_attr(self, attr):
        try:
            return hasattr(self, attr) and len(getattr(self, attr)) > 0
        except:
            return False
    
    def _pt_scaler_exists(self):
        return self._check_list_attr('_pt_scaler')
    
    def _train_indices_exists(self):
        return self._check_list_attr('_train_indices')
    
    def to_array(self, columns=None, ycolumns=1, dtype=np.float32):
        df = self[columns] if columns else self
        X = PTArray(df.to_numpy().astype(dtype))
        X.__array_finalize__(self)
        X.ycolumns = ycolumns
        return X
    
    def to_dataset(self, columns=None, ycolumns=1, dtype=np.float32):
        return self.to_array(columns, ycolumns, dtype).to_dataset()
        
    def to_arrays(self, columns=None, ycolumns=1, dtype=np.float32):
        return self.to_array(columns, ycolumns, dtype).to_arrays()
    
    @property
    def pt_indices(self):
        return self._pt_indices
    
    @property
    def train_indices(self):
        return self._train_indices
    
    @property
    def valid_indices(self):
        return self._valid_indices
    
    @property
    def test_indices(self):
        return self._test_indices

    @property
    def pt_scaler(self):
        return self._pt_scaler

    @pt_indices.setter
    def pt_indices(self, value):
        self.__dict__['_pt_indices'] = value

    @train_indices.setter
    def train_indices(self, value):
        self.__dict__['_train_indices'] = value

    @valid_indices.setter
    def valid_indices(self, value):
        self.__dict__['_valid_indices'] = value

    @test_indices.setter
    def test_indices(self, value):
        self.__dict__['_test_indices'] = value

    @pt_scaler.setter
    def pt_scaler(self, value):
        self.__dict__['_pt_scaler'] = value

    def split(self, split=0.2, shuffle=True):
        r = copy.deepcopy(self)
        r.pt_indices = np.where(r.iloc[:,-1].notnull())[0]
        if shuffle:
            np.random.shuffle(r.pt_indices)
        r.train_indices, r.valid_indices, r.test_indices = r._split_indices(r.pt_indices, split)
        return r
    
    def polynomials(self, degree):
        return self.to_array().polynomials(degree)
    
    def scale(self, columns=True, scalertype=StandardScaler, omit_interval=(-2,2)):
        assert self._train_indices_exists(), "Split the DataFrame before scaling!"
        assert not self._pt_scaler_exists(), "Trying to scale twice, which is a really bad idea!"
        if columns == True:
            columns = self._guess_columns(omit_interval=omit_interval)
        elif columns == False:
            columns == []
        r = copy.deepcopy(self)
        r.pt_scaler = tuple( self._create_scaler(scalertype, s) if c in columns else None for c, s in self.items() )
        return r.transform(self)
    
    def _guess_columns(df, omit_interval=(-2, 2)):
        return [c for c, series in df.items() if series.dtype.kind == 'f' and (series.min() < omit_interval[0] or series.max() > omit_interval[1])]    
    
    @staticmethod
    def _create_scaler(scalertype, series):
        scaler = scalertype()
        if len(series.shape) == 1:
            series = series.to_numpy().reshape(-1, 1)
        scaler.fit(series)
        return scaler

    def transform(self, df):
        out = []
        for (c, series), scaler in zip(df.items(), self.pt_scaler):
            if scaler is not None:
                scaled = scaler.transform(series.to_numpy().reshape(-1,1))
                out.append(pd.Series(scaled.reshape(-1), index=df.index, name=c))
            else:
                out.append(series)
        return self._copy_meta(self._constructor(pd.concat(out, axis=1)))

    def inverse_transform_y(self, y):
        yscaler = self.pt_scaler[-1]
        if yscaler is not None:
            y = to_numpy(y)
            if len(y.shape) == 1:
                y = y.reshape(-1, 1)
                y = yscaler.inverse_transform(y)
                y = y.reshape(-1)
            else:
                y = yscaler.inverse_transform(y)
        return y

    def inverse_transform_X(self, X):
        X = to_numpy(X)
        transform = [ X[i] if self.pt_scaler[i] is None else self.pt_scaler[i].inverse_transform(X[:,i]) for i in range(X.shape[1]) ]
        transform = [ pd.Series(x, name=c) for x, c in zip(transform, self.columns[:-1])]
        return pd.concat(transform, axis=1)

    def inverse_transform(self, X, y):
        y = self.inverse_transform_y(y)
        X = self.inverse_transform_X(X)
        return pd.concat([X, y], axis=1)
    
    def balance(self, weights=None):
        assert self._train_indices_exists(), "You have to split the DataFrame first"
        if weights is None:
            return self.balance_y_equal()
        y = self[[self.columns[-1]]]
        y = y.iloc[self.train_indices]
        indices = {l:np.where(y==l)[0] for l in np.unique(y)}
        classlengths = {l:len(i) for l,i in indices}
        n = max([ int(math.ceil(classlength[c] / w)) for c, w in weights.items() ])
        mask = np.hstack([np.random.choice(i, n*weights[l]-classlength[l], replace=True) for l, i in indices.items()])
        indices = np.hstack([mask, range(len(y))])
        self.train_indices = self.train_indices[indices]
        return self

    def inverse_transform_y(self, y):
        return self.pt_scaler.inverse_transform_y(to_numpy(y))

    def inverse_transform_x(self, X):
        return self.pt_scaler.inverse_transform_X(to_numpy(X))

    def inverse_transform(self, X, y):
        return self.pt_scaler.inverse_transform( to_numpy(X), to_numpy(y) )
    
    def balance_y_equal(self):
        assert self._train_indices_exists(), "You have to split the DataFrame first"
        y = self[[self.columns[-1]]]
        y = y.iloc[self.train_indices]
        indices = [np.where(y==l)[0] for l in np.unique(y)]
        classlengths = [len(i) for i in indices]
        n = max(classlengths)
        mask = np.hstack([np.random.choice(i, n-l, replace=True) for l,i in zip(classlengths, indices)])
        indices = np.hstack([mask, range(len(y))])
        self.train_indices = self.train_indices[indices]
        return self
    
    @classmethod
    def _split_indices(self, indices, split):
        length = len(indices)
        try:
            train_length = int((1 - sum(split))* length)
            train_valid_length = int((1 - split[1])* length)
            assert train_length > 0, 'Non positive size of the training set, provide fractions for valid/test part, e.g. (0.2, 0.3)'
            assert train_valid_length > train_length, 'Non positive size of the validation set, provide fraction for valid part, bigger than 0, e.g. e.g. (0.2, 0.3)'
            assert length >= train_valid_length, 'Negative fraction of the test set, provide fractions for valid/test part, e.g. (0.2, 0.3)'
            train_indices = indices[:train_length]
            valid_indices = indices[train_length:train_valid_length]
            test_indices = indices[train_valid_length:]
        except:
            train_length = int((1 - split)* length)
            assert train_length > 0, 'Non positive size of the training set, provide fraction for valid part, smaller than 1, e.g. 0.2'
            assert train_length < length, 'Non positive size of the validation set, provide fraction for valid part, bigger than 0, e.g. 0.2'
            train_indices = indices[:train_length]
            valid_indices = indices[train_length:]
            test_indices = []
        return train_indices, valid_indices, test_indices

Appending to pipetorch.py


In [4]:
%%writefile -a pipetorch.py

class PTArray(np.ndarray):
    _metadata = ['_pt_scaler', '_pt_indices', '_train_indices', '_valid_indices', '_test_indices', '_ycolumns']

    def __new__(cls, input_array):
        return np.asarray(input_array).view(cls)

    def __array_finalize__(self, obj) -> None:
        if obj is None: return
        self._ycolumns = 1
        d = { a:getattr(obj, a) for a in self._metadata if hasattr(obj, a) }
        self.__dict__.update(d)

    def __array_function__(self, func, types, *args, **kwargs):
        return self._wrap(super().__array_function__(func, types, *args, **kwargs))
        
    def __array_ufunc__(self, ufunc, method, *inputs, **kwargs):
        def cast(i):
            if type(i) is PTArray:
                return i.view(np.ndarray)
            return i
        
        inputs = [ cast(i) for i in inputs ]
        return self._wrap(super().__array_ufunc__(ufunc, method, *inputs, **kwargs))        
    
    def _check_list_attr(self, attr):
        try:
            return hasattr(self, attr) and len(getattr(self, attr)) > 0
        except:
            return False
    
    def _pt_scaler_exists(self):
        return self._check_list_attr('_pt_scaler')

    def _test_indices_exists(self):
        return self._check_list_attr('_test_indices')

    def _train_indices_exists(self):
        return self._check_list_attr('_train_indices')
    
    def _wrap(self, a):
        a = PTArray(a)
        a.__dict__.update(self.__dict__)
        return a

    def polynomials(self, degree):
        assert not self._pt_scaler_exists(), "Run polynomials before scaling"
        poly = PolynomialFeatures(degree, include_bias=False)
        p = poly.fit_transform(self[:,:-self.ycolumns])
        return self._wrap(np.concatenate([p, self[:, -self.ycolumns:]], axis=1))
    
    def to_arrays(self, ycolumns=None, dtype=np.float32):
        if ycolumns:
            self._ycolumns = ycolumns
        a = self._wrap(self.astype(dtype))
        if self._test_indices_exists() and len(self._test_indices) > 0:
            return a.train_X, a.valid_X, a.test_X, a.train_y, a.valid_y, a.test_y
        return a.train_X, a.valid_X, a.train_y, a.valid_y

    def scale(self, scalertype=StandardScaler):
        assert self._train_indices_exists(), "Split the DataFrame before scaling!"
        assert not self._pt_scaler_exists(), "Trying to scale twice, which is a really bad idea!"
        r = self._wrap(copy.deepcopy(self))
        r._pt_scaler = tuple(self._create_scaler(scalertype, column) for column in self[self._train_indices].T)
        return r.transform(self)

    @staticmethod
    def _create_scaler(scalertype, column):
        scaler = scalertype()
        scaler.fit(column.reshape(-1,1))
        return scaler

    def transform(self, array):
        out = []
        for column, scaler in zip(array.T, self._pt_scaler):
            if scaler is not None:
                out.append(scaler.transform(column.reshape(-1,1)))
            else:
                out.append(column)
        return self._wrap(np.concatenate(out, axis=1))

    def inverse_transform_y(self, y):
        y = to_numpy(y)
        y = y.reshape(-1, self._ycolumns)
        out = [ y[i] if self._pt_scaler[-self._ycolumns+i] is None else self._pt_scaler[-self._ycolumns+i].inverse_transform(y[:,i]) for i in range(y.shape[1]) ]
        if len(out) == 1:
            return self._wrap(out[0])
        return self._wrap(np.concatenate(out, axis=1))
    
    def inverse_transform_X(self, X):
        X = to_numpy(X)
        transform = [ X[i] if self._pt_scaler[i] is None else self._pt_scaler[i].inverse_transform(X[:,i]) for i in range(X.shape[1]) ]
        return np._wrap(np.concatenate(transform, axis=1))

    def inverse_transform(self, X, y):
        y = self.inverse_transform_y(y)
        X = self.inverse_transform_X(X)
        return pd.concat([X, y], axis=1)

    def to_dataset(self):
        """
        returns: a list with a train, valid and test DataSet. Every DataSet contains an X and y, where the 
        input data matrix X contains all columns but the last, and the target y contains the last column
        columns: list of columns to convert, the last column is always the target. default=None means all columns.
        """
        import torch
        from torch.utils.data import TensorDataset, DataLoader
        tensor_y = torch.from_numpy(self.y)
        tensor_X = torch.from_numpy(self.X)

        p = [ TensorDataset(tensor_X[self._train_indices], tensor_y[self._train_indices]) ]
        p.append( TensorDataset(tensor_X[self._valid_indices], tensor_y[self._valid_indices]) )
        if len(self._test_indices) > 0:
            p.append( TensorDataset(tensor_X[self._test_indices], tensor_y[self._test_indices]) )
        return p
    
    def to_databunch(self, batch_size=32, num_workers=0, shuffle=True, pin_memory=False, balance=False):
        """
        returns: a Databunch that contains dataloaders for the train, valid and test part.
        batch_size, num_workers, shuffle, pin_memory: see Databunch/Dataloader constructor
        """
        ds = self.to_dataset()
        return Databunch(*ds, batch_size=batch_size, num_workers=num_workers, shuffle=shuffle, pin_memory=pin_memory, scaler=self, balance=balance)    

    @property
    def X(self):
        return self[self._pt_indices, :-self._ycolumns]    
    
    @property
    def y(self):
        return self[self._pt_indices, -self._ycolumns:]    
    
    @property
    def train_X(self):
        return self[self._train_indices, :-self._ycolumns]
    
    @property
    def valid_X(self):
        return self[self._valid_indices, :-self._ycolumns]

    @property
    def test_X(self):
        return self[self._test_indices, :-self._ycolumns]
    
    @property
    def train_y(self):
        return self[self._train_indices, -self._ycolumns:]
    
    @property
    def valid_y(self):
        return self[self._valid_indices, -self._ycolumns:]

    @property
    def test_y(self):
        return self[self._test_indices, -self._ycolumns:]
        

Appending to pipetorch.py


In [5]:
%%writefile -a pipetorch.py

class Databunch:
    def __init__(self, train_ds, valid_ds, test_ds=None, batch_size=32, num_workers=0, shuffle=True, pin_memory=False, scaler=None, balance=False):
        self.train_ds = train_ds
        self.valid_ds = valid_ds
        self.test_ds = test_ds
        self.batch_size = batch_size
        self.num_workers = num_workers
        self.shuffle = shuffle
        self.pin_memory = pin_memory
        self.scaler = scaler
        self.balance = balance
        
    @property
    def train_dl(self):
        try:
            return self._train_dl
        except:
            from torch.utils.data import DataLoader

            sampler = self._weighted_sampler(self.balance) if self.balance is not False else None
            shuffle = False if self.balance is not False else self.shuffle
            self._train_dl = DataLoader(self.train_ds, batch_size=self.batch_size, num_workers=self.num_workers, shuffle=shuffle, pin_memory=self.pin_memory, sampler=sampler)
            return self._train_dl

    @property
    def valid_dl(self):
        try:
            return self._valid_dl
        except:
            from torch.utils.data import DataLoader

            self._valid_dl = DataLoader(self.valid_ds, batch_size=len(self.valid_ds), num_workers=self.num_workers, shuffle=False, pin_memory=self.pin_memory)
            return self._valid_dl

    @property
    def test_dl(self):
        try:
            return self._test_dl
        except:
            from torch.utils.data import DataLoader

            self._test_dl = DataLoader(self.test_ds, batch_size=len(self.test_ds), num_workers=self.num_workers, shuffle=False, pin_memory=self.pin_memory)
            return self._test_dl

    def reset(self):
        try:
            del self._valid_dl
        except: pass
        try:
            del self._train_dl
        except: pass
        try:
            del self._test_dl
        except: pass

    def _weighted_sampler(self, weights):
        import torch
        from torch.utils.data import WeightedRandomSampler

        target = self.train_ds.tensors[1].numpy().squeeze()
        if weights == True:
            weights = {t:(1. / c) for t, c in zip(*np.unique(target, return_counts=True))}
        samples_weight = np.array([weights[t] for t in target])
        samples_weight = torch.from_numpy(samples_weight)
        return WeightedRandomSampler(samples_weight, len(samples_weight))
      
    @property
    def batch_size(self):
        return self._batch_size

    @batch_size.setter
    def batch_size(self, value):
        self._batch_size = min(value, len(self.train_ds))
        self.reset()

    @property
    def num_workers(self):
        return self._num_workers

    @num_workers.setter
    def num_workers(self, value):
        self._num_workers = value
        self.reset()
    
    def inverse_transform_y(self, y):
        return self.scaler.inverse_transform_y(y)

    def sample(self, device=None):
        arrays = next(iter(self.train_dl))
        if device is not None:
            arrays = [ a.to(device) for a in arrays ]
        return arrays

    @property
    def train_X(self):
        return self.train_ds.tensors[0]

    @property
    def valid_X(self):
        return self.valid_ds.tensors[0]

    @property
    def test_X(self):
        return self.test_ds.tensors[0]

    @property
    def train_y(self):
        return self.train_ds.y.tensors[1]

    @property
    def valid_y(self):
        return self.valid_ds.y.tensors[1]

    @property
    def test_y(self):
        return self.test_ds.y.tensors[1]

Appending to pipetorch.py


In [6]:
from pipetorch import *

In [7]:
df = PTDataFrame.read_csv('/data/datasets/winequality-red.csv', sep=';')

In [8]:
df['quality'] = (df.quality >= 6)

In [9]:
from sklearn.linear_model import LogisticRegression

In [10]:
from sklearn.metrics import f1_score

In [11]:
train_X, valid_X, train_y, valid_y = df[['pH', 'alcohol', 'quality']].split(0.2).scale().to_arrays()

In [12]:
model = LogisticRegression()
model.fit(train_X, train_y)

  return f(**kwargs)


LogisticRegression()

In [13]:
pred_y = model.predict(valid_X)

In [14]:
pred_y

array([0., 1., 1., 0., 0., 0., 0., 1., 0., 0., 0., 0., 1., 0., 1., 1., 1.,
       1., 0., 1., 1., 0., 0., 1., 1., 0., 0., 1., 1., 1., 1., 0., 1., 1.,
       1., 1., 0., 0., 1., 0., 0., 0., 0., 1., 1., 1., 1., 0., 0., 0., 0.,
       1., 1., 1., 0., 1., 1., 0., 0., 0., 0., 0., 0., 1., 0., 1., 0., 0.,
       1., 1., 0., 1., 1., 1., 1., 1., 0., 0., 1., 1., 1., 0., 1., 0., 1.,
       1., 0., 0., 1., 0., 1., 0., 0., 0., 1., 0., 1., 0., 0., 1., 0., 0.,
       0., 1., 0., 0., 1., 0., 1., 1., 0., 0., 1., 0., 0., 0., 0., 1., 0.,
       1., 0., 0., 0., 1., 1., 0., 0., 1., 0., 0., 0., 0., 0., 0., 1., 1.,
       1., 0., 1., 0., 0., 1., 1., 0., 0., 0., 0., 0., 0., 0., 0., 1., 1.,
       0., 1., 0., 0., 0., 0., 1., 0., 1., 1., 0., 1., 0., 1., 1., 0., 1.,
       0., 1., 0., 0., 1., 0., 1., 1., 1., 0., 1., 0., 1., 0., 1., 0., 0.,
       0., 1., 1., 1., 1., 1., 0., 0., 0., 1., 1., 0., 1., 1., 0., 0., 1.,
       1., 1., 0., 1., 0., 1., 1., 1., 1., 0., 0., 1., 0., 1., 0., 1., 1.,
       0., 0., 1., 0., 0.

In [15]:
f1_score(valid_y, pred_y)

0.676829268292683

In [16]:
db = df[['pH', 'alcohol', 'quality']].split(0.2).scale().to_array().to_databunch()

In [17]:
from dl2.tabular import *

using gpu 4


In [18]:
class Wine(nn.Module):
    def __init__(self):
        super().__init__()
        self.w1 = nn.Linear(2, 1)
        self.sigmoid = nn.Sigmoid()
        
    def forward(self, X):
        return self.sigmoid(self.w1(X))
    

In [19]:
model = Wine()

In [20]:
t = trainer(model, db, loss=nn.BCELoss(), metrics=[loss, f1])

In [21]:
t.train(200, lr=[1e-3, 3e-3])

HBox(children=(HTML(value='Total'), FloatProgress(value=0.0, max=320000.0), HTML(value='')))

1 0.11s train loss: 0.645604  f1: 0.524691 valid loss: 0.629726  f1: 0.597122
2 0.04s train loss: 0.635023  f1: 0.570588 valid loss: 0.618584  f1: 0.627178
3 0.04s train loss: 0.625190  f1: 0.607714 valid loss: 0.610255  f1: 0.660000
4 0.04s train loss: 0.617749  f1: 0.629964 valid loss: 0.603269  f1: 0.683871
5 0.04s train loss: 0.611546  f1: 0.637885 valid loss: 0.597957  f1: 0.700315
6 0.03s train loss: 0.606604  f1: 0.651646 valid loss: 0.593139  f1: 0.712500
7 0.03s train loss: 0.602177  f1: 0.658662 valid loss: 0.589824  f1: 0.727829
8 0.04s train loss: 0.598938  f1: 0.670025 valid loss: 0.586538  f1: 0.728916
9 0.03s train loss: 0.596044  f1: 0.676056 valid loss: 0.584011  f1: 0.734328
10 0.03s train loss: 0.593873  f1: 0.683648 valid loss: 0.581829  f1: 0.735294
11 0.04s train loss: 0.592093  f1: 0.679643 valid loss: 0.579499  f1: 0.735294
12 0.03s train loss: 0.590293  f1: 0.684932 valid loss: 0.577873  f1: 0.742690
13 0.04s train loss: 0.588942  f1: 0.684380 valid loss: 0.577

109 0.04s train loss: 0.581042  f1: 0.710324 valid loss: 0.565453  f1: 0.765363
110 0.04s train loss: 0.581027  f1: 0.710324 valid loss: 0.565640  f1: 0.765363
111 0.04s train loss: 0.581057  f1: 0.710324 valid loss: 0.565414  f1: 0.765363
112 0.03s train loss: 0.581049  f1: 0.710769 valid loss: 0.565758  f1: 0.765363
113 0.04s train loss: 0.581029  f1: 0.711760 valid loss: 0.565855  f1: 0.765363
114 0.03s train loss: 0.581026  f1: 0.710769 valid loss: 0.565289  f1: 0.765363
115 0.03s train loss: 0.581035  f1: 0.709877 valid loss: 0.565261  f1: 0.767507
116 0.04s train loss: 0.581081  f1: 0.709777 valid loss: 0.565663  f1: 0.765363
117 0.03s train loss: 0.581112  f1: 0.709527 valid loss: 0.566191  f1: 0.765363
118 0.04s train loss: 0.581077  f1: 0.709077 valid loss: 0.566557  f1: 0.765363
119 0.03s train loss: 0.581146  f1: 0.710324 valid loss: 0.565812  f1: 0.765363
120 0.04s train loss: 0.581032  f1: 0.709877 valid loss: 0.566123  f1: 0.765363
121 0.04s train loss: 0.581060  f1: 0.71