In [None]:
# default_exp core

# The foundations

> Let's try to replicate the standard usage of MNL frameworks

We will try to implement a basic MNL package to compare against biogeme/others... let's start from this [blog post](https://aaronkub.com/2020/02/12/logistic-regression-with-pytorch.html)

In [None]:
#export
import random

import pandas as pd
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset

from fastprogress import progress_bar, master_bar
from fastcore.all import *

grab some data

In [None]:
data = pd.read_csv("./data/Iris.csv").drop("Id", axis=1)

X_numpy = data.drop("Species", axis=1).values

target_map = {
    val: index for index, val in enumerate(data.Species.unique())
}
y_numpy = data.Species.map(target_map).values

X = torch.tensor(X_numpy, dtype=torch.float32)
y = torch.tensor(y_numpy)

In [None]:
data.head()

Unnamed: 0,SepalLengthCm,SepalWidthCm,PetalLengthCm,PetalWidthCm,Species
0,5.1,3.5,1.4,0.2,Iris-setosa
1,4.9,3.0,1.4,0.2,Iris-setosa
2,4.7,3.2,1.3,0.2,Iris-setosa
3,4.6,3.1,1.5,0.2,Iris-setosa
4,5.0,3.6,1.4,0.2,Iris-setosa


In [None]:
target_map

{'Iris-setosa': 0, 'Iris-versicolor': 1, 'Iris-virginica': 2}

In [None]:
#export
def prepare_data(data, x_cols=None, target_col=None):
    "This is far from optimal, as we shu=ould be reading values lazily"
    target_col = ifnone(target_col, list(data.columns)[-1])
    x_cols = [col for col in ifnone(x_cols, list(data.columns)) if col!=target_col]
    X_numpy = data.loc[:, x_cols].values
    target_map = {
        val: index for index, val in enumerate(data.loc[:,target_col].unique())
    }
    y_numpy = data.loc[:,target_col].map(target_map).values
    
    X = torch.tensor(X_numpy, dtype=torch.float32)
    y = torch.tensor(y_numpy)
    
    return X, y

In [None]:
X, y = prepare_data(data)

In [None]:
X[0:5], y[0:5]

(tensor([[5.1000, 3.5000, 1.4000, 0.2000],
         [4.9000, 3.0000, 1.4000, 0.2000],
         [4.7000, 3.2000, 1.3000, 0.2000],
         [4.6000, 3.1000, 1.5000, 0.2000],
         [5.0000, 3.6000, 1.4000, 0.2000]]),
 tensor([0, 0, 0, 0, 0]))

In [None]:
len(X), len(y)

(150, 150)

## Train/Test split

In [None]:
random.shuffle(range_of(X))

In [None]:
#export
def train_valid_split(X, y, pct=0.2, shuffle=True):
    assert len(X) == len(y), "X and y don't have the same number of elements"
    indices = range_of(X)
    if shuffle:
        random.shuffle(indices)
    n = len(X)
    n_train = int(n * (1-0.2))
    X_train, y_train = X[indices[:n_train]], y[indices[:n_train]]
    X_valid, y_valid = X[indices[n_train:]], y[indices[n_train:]]
    return X_train, y_train, X_valid, y_valid

In [None]:
_X = L([0,1,2,3,4,5,6,7,8,9])
_y = L('a,b,c,d,e,f,g,h,i,j'.split(','))

test_eq(train_valid_split(_X,_y, shuffle=False)[0], [0,1,2,3,4,5,6,7])
test_eq(train_valid_split(_X,_y, shuffle=False)[3], ['i','j'])

## Model

In [None]:
#export
class LinearMNL(nn.Module):
    
    def __init__(self, in_dim=4, out_dim=5, bias=False):
        super().__init__()
        store_attr()
        self.linear = nn.Linear(in_dim, out_dim, bias=bias)
        
    def forward(self, x):
        return self.linear(x)

In [None]:
model = LinearMNL(4,3)
x = torch.rand(10,4)

model(x).shape

torch.Size([10, 3])

In [None]:
loss_func = torch.nn.CrossEntropyLoss()

In [None]:
#export
class DataLoaders:
    """
    A class to store dataloaders (train/valid/test....)"""
    def __init__(self, train_dl, valid_dl=None):
        store_attr()
        
    def one_batch(self, dl=None):
        dl = ifnone(dl, self.train_dl)
        return next(iter(dl))
    
    @delegates(DataLoader, but='batch_size')
    @classmethod
    def from_datasets(cls, train_ds, valid_ds=None, batch_size=1, **kwargs):
        train_dl = DataLoader(train_ds, batch_size=batch_size, **kwargs)
        if valid_ds is not None:
            valid_dl = DataLoader(valid_ds, batch_size=2*batch_size, **kwargs)
        else:
            valid_dl = None
        return cls(train_dl, valid_dl)
    
    @delegates(DataLoader, but='batch_size')
    @classmethod
    def from_Xy(cls, X, y, pct=None, batch_size=1, **kwargs):
        if pct is not None:
            X_train, y_train, X_valid, y_valid = train_valid_split(X, y, pct)
        else:
            X_train, y_train, X_valid, y_valid = X, y, None, None
        train_ds = TensorDataset(X_train, y_train)
        if X_valid is not None:
            valid_ds = TensorDataset(X_valid, y_valid)
        else:
            valid_ds = None
        return cls.from_datasets(train_ds, valid_ds, batch_size)
        

In [None]:
dls = DataLoaders.from_Xy(X, y, pct=0.2, batch_size=8)

In [None]:
#export
class Learner:
    "A wrapper around dls, model and optimizer"
    def __init__(self, dls, model, loss_func=torch.nn.CrossEntropyLoss()):
        store_attr()
        

    def train_one_epoch(self):
        accum_loss = 0.
        for batch, (x, y) in enumerate(self.dls.train_dl):
            pred = self.model(x)  # 1
            loss = self.loss_func(pred, y)

            #backprop
            self.optimizer.zero_grad()
            loss.backward()
            self.optimizer.step()
            accum_loss += loss.item()
        return accum_loss
    
    def validate(self, dl=None):
        dl = ifnone(dl, self.dls.valid_dl)
        if (dl is None):
            return 'No validation data'
        val_loss, accu = 0, 0
        with torch.no_grad():
            for batch, (x, y) in enumerate(dl):
                pred = self.model(x)
                val_loss += self.loss_func(pred, y).item()
                accu += (pred.argmax(1) == y).type(torch.float).sum().item()
        return val_loss, accu / len(dl.dataset)
    
    def fit(self, n_epochs=10, lr=0.01, wd=0.01):
        
        self.optimizer = torch.optim.SGD(
            self.model.parameters(), 
            lr=lr, 
            weight_decay=wd
        )
        
        for epoch in progress_bar(range_of(n_epochs), leave=False):
            loss = self.train_one_epoch()
            val_loss, accuracy = self.validate()
            print(f'epoch = {epoch:3.0f}, train_loss = {loss:.3f}, val_loss = {val_loss:.3f}, accuracy = {accuracy:.2f}')

In [None]:
model = LinearMNL(4,3)
learn = Learner(dls, model, loss_func)

In [None]:
learn.fit(20)

epoch =   0, train_loss = 28.925, val_loss = 2.915, accuracy = 0.30
epoch =   1, train_loss = 20.222, val_loss = 2.608, accuracy = 0.30
epoch =   2, train_loss = 18.252, val_loss = 2.354, accuracy = 0.30
epoch =   3, train_loss = 16.640, val_loss = 2.134, accuracy = 0.33
epoch =   4, train_loss = 15.261, val_loss = 1.951, accuracy = 0.37
epoch =   5, train_loss = 14.102, val_loss = 1.800, accuracy = 0.43
epoch =   6, train_loss = 13.137, val_loss = 1.676, accuracy = 0.77
epoch =   7, train_loss = 12.337, val_loss = 1.574, accuracy = 0.80
epoch =   8, train_loss = 11.670, val_loss = 1.489, accuracy = 0.83
epoch =   9, train_loss = 11.112, val_loss = 1.418, accuracy = 0.83
epoch =  10, train_loss = 10.641, val_loss = 1.358, accuracy = 0.83
epoch =  11, train_loss = 10.239, val_loss = 1.307, accuracy = 0.83
epoch =  12, train_loss = 9.893, val_loss = 1.263, accuracy = 0.83
epoch =  13, train_loss = 9.593, val_loss = 1.225, accuracy = 0.83
epoch =  14, train_loss = 9.329, val_loss = 1.192,

## Export

In [None]:
# hide
from nbdev.export import *
notebook2script()

Converted 00_core.ipynb.
Converted index.ipynb.
Converted swissmetro.ipynb.
