In [None]:
# default_exp core

# module name here

> API details.

In [None]:
#hide
from nbdev.showdoc import *

In [None]:
#export
def hello_nbdev(d):
    print(d)

def helloworld():
    print('d')

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import urllib

In [None]:
SEED = 180224
DATA_FILE = "spiral.csv"
# Set seed for reproducibility
np.random.seed(SEED)

In [None]:
# Load data from GitHub to this notebook's local drive
url = "https://raw.githubusercontent.com/madewithml/lessons/master/data/spiral.csv"
response = urllib.request.urlopen(url)
html = response.read()
with open(DATA_FILE, 'wb') as fp:
    fp.write(html)

In [None]:
# Load data
df = pd.read_csv(DATA_FILE, header=0)
X = df[['X1', 'X2']].values
y = df['color'].values
df.head(5)

In [None]:
print ("X: ", np.shape(X))
print ("y: ", np.shape(y))

In [None]:
y

In [None]:
# Visualize data
plt.title("Generated non-linear data")
colors = {'c1': 'red', 'c2': 'yellow', 'c3': 'blue'}
plt.scatter(X[:, 0], X[:, 1], c=[colors[_y] for _y in y], edgecolors='k', s=25)
plt.show()

In [None]:
import collections
from sklearn.model_selection import train_test_split

In [None]:
TRAIN_SIZE = 0.7
VAL_SIZE = 0.15
TEST_SIZE = 0.15
SHUFFLE = True

In [None]:
def train_val_test_split(X, y, val_size, test_size, shuffle):
    """Split data into train/val/test datasets.
    """
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=test_size, stratify=y, shuffle=shuffle)
    X_train, X_val, y_train, y_val = train_test_split(
        X_train, y_train, test_size=val_size, stratify=y_train, shuffle=shuffle)
    return X_train, X_val, X_test, y_train, y_val, y_test

In [None]:
# Create data splits
X_train, X_val, X_test, y_train, y_val, y_test = train_val_test_split(
    X=X, y=y, val_size=VAL_SIZE, test_size=TEST_SIZE, shuffle=SHUFFLE)
class_counts = dict(collections.Counter(y))
print (f"X_train: {X_train.shape}, y_train: {y_train.shape}")
print (f"X_val: {X_val.shape}, y_val: {y_val.shape}")
print (f"X_test: {X_test.shape}, y_test: {y_test.shape}")
print (f"Sample point: {X_train[0]} → {y_train[0]}")
print (f"Classes: {class_counts}")

In [None]:
from sklearn.preprocessing import LabelEncoder

In [None]:
# Output vectorizer
y_tokenizer = LabelEncoder()

In [None]:
# Fit on train data
y_tokenizer = y_tokenizer.fit(y_train)
classes = list(y_tokenizer.classes_)
print (f"classes: {classes}")

In [None]:
# Convert labels to tokens
print (f"y_train[0]: {y_train[0]}")
y_train = y_tokenizer.transform(y_train)
y_val = y_tokenizer.transform(y_val)
y_test = y_tokenizer.transform(y_test)
print (f"y_train[0]: {y_train[0]}")

In [None]:
# Class weights
counts = collections.Counter(y_train)
class_weights = {_class: 1.0/count for _class, count in counts.items()}
print (f"class counts: {counts},\nclass weights: {class_weights}")

In [None]:
from sklearn.preprocessing import StandardScaler

In [None]:
# Standardize the data (mean=0, std=1) using training data
# Apply scaler on training and test data (don't standardize outputs for classification)
X_scaler = StandardScaler().fit(X_train)
X_train = X_scaler.transform(X_train)
X_val = X_scaler.transform(X_val)
X_test = X_scaler.transform(X_test)

In [None]:
# Check (means should be ~0 and std should be ~1)
print (f"X_train[0]: mean: {np.mean(X_train[:, 0], axis=0):.1f}, std: {np.std(X_train[:, 0], axis=0):.1f}")
print (f"X_train[1]: mean: {np.mean(X_train[:, 1], axis=0):.1f}, std: {np.std(X_train[:, 1], axis=0):.1f}")
print (f"X_val[0]: mean: {np.mean(X_val[:, 0], axis=0):.1f}, std: {np.std(X_val[:, 0], axis=0):.1f}")
print (f"X_val[1]: mean: {np.mean(X_val[:, 1], axis=0):.1f}, std: {np.std(X_val[:, 1], axis=0):.1f}")
print (f"X_test[0]: mean: {np.mean(X_test[:, 0], axis=0):.1f}, std: {np.std(X_test[:, 0], axis=0):.1f}")
print (f"X_test[1]: mean: {np.mean(X_test[:, 1], axis=0):.1f}, std: {np.std(X_test[:, 1], axis=0):.1f}")

## Train torch model

In [None]:
import torch
from torch import nn
import torch.nn.functional as F
from torch.optim import Adam
from torchsummary import summary

In [None]:
# Set seed for reproducibility
torch.manual_seed(SEED)

In [None]:
DEVICE = 'cpu'
INPUT_DIM = X_train.shape[1] # X is 2-dimensional
HIDDEN_DIM = 100
NUM_CLASSES = len(classes) # 3 classes

In [None]:
LEARNING_RATE = 1e-2
NUM_EPOCHS = 10
BATCH_SIZE = 32

In [None]:
class MLP(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_classes):
        super(MLP, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, num_classes)
        
    def forward(self, x_in, apply_softmax=False):
        z = F.relu(self.fc1(x_in)) # ReLU activaton function added!
        y_pred = self.fc2(z)
        if apply_softmax:
            y_pred = F.softmax(y_pred, dim=1) 
        return y_pred
    
class MLP3(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_classes):
        super(MLP3, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = nn.Linear(hidden_dim, num_classes)
        
    def forward(self, x_in, apply_softmax=False):
        act1 = F.relu(self.fc1(x_in))
        act2 = F.relu(self.fc2(act1))
        y_pred = self.fc3(act2)
        if apply_softmax:
            y_pred = F.softmax(y_pred, dim=1) 
        return y_pred

In [None]:
# Initialize model
model = MLP(input_dim=INPUT_DIM, hidden_dim=HIDDEN_DIM, num_classes=NUM_CLASSES)
print (model.named_parameters)
summary(model, input_size=(INPUT_DIM,), device="cpu")

In [None]:
# # Initialize model3
# model3 = MLP3(input_dim=INPUT_DIM, hidden_dim=HIDDEN_DIM, num_classes=NUM_CLASSES)
# print (model3.named_parameters)
# summary(model3, input_size=(INPUT_DIM,), device="cpu")

In [None]:
# Loss
weights = torch.Tensor([class_weights[key] for key in sorted(class_weights.keys())])
loss_fn = nn.CrossEntropyLoss(weight=weights)

In [None]:
# Accuracy
def accuracy_fn(y_pred, y_true):
    n_correct = torch.eq(y_pred, y_true).sum().item()
    accuracy = (n_correct / len(y_pred)) * 100
    return accuracy

In [None]:
# Optimizer
optimizer = Adam(model.parameters(), lr=LEARNING_RATE) 

In [None]:
# Convert data to tensors
X_train = torch.Tensor(X_train)
y_train = torch.LongTensor(y_train)
X_val = torch.Tensor(X_val)
y_val = torch.LongTensor(y_val)
X_test = torch.Tensor(X_test)
y_test = torch.LongTensor(y_test)

In [None]:
# Training
for epoch in range(NUM_EPOCHS*10):
    # Forward pass
    y_pred = model(X_train)

    # Loss
    loss = loss_fn(y_pred, y_train)

    # Zero all gradients
    optimizer.zero_grad()

    # Backward pass
    loss.backward()

    # Update weights
    optimizer.step()

    if epoch%10==0: 
        predictions = y_pred.max(dim=1)[1] # class
        accuracy = accuracy_fn(y_pred=predictions, y_true=y_train)
        print (f"Epoch: {epoch} | loss: {loss:.2f}, accuracy: {accuracy:.1f}")

In [None]:
#hide
from loss_landscapes import random_plane
from loss_landscapes.metrics import Loss

In [None]:
STEPS = 20
# torch.manual_seed(SEED)
# np.random.seed(SEED)

In [None]:
metric = Loss(loss_fn, X_train, y_train)

In [None]:
landscape = random_plane(model, metric, seed=123, distance=.1, steps=STEPS)
landscape

In [None]:
landscape1 = random_plane(model, metric, seed=SEED, distance=.1, steps=STEPS)

In [None]:
landscape == landscape1

In [None]:
torch.manual_seed(SEED)
torch.rand(size=(2,))

In [None]:
plt.contour(landscape, levels=50)
plt.title('Loss Contours around Trained Model')
plt.show()

In [None]:
landscape.shape

In [None]:
from mpl_toolkits import mplot3d

In [None]:
fig = plt.figure(figsize=(9,6))
ax = plt.axes(projection='3d')
X = np.array([[j for j in range(STEPS)] for i in range(STEPS)])
Y = np.array([[i for _ in range(STEPS)] for i in range(STEPS)])
ax.plot_surface(X, Y, landscape, rstride=1, cstride=1, cmap='YlGnBu', edgecolor='none')
ax.set_title('Surface Plot of Loss Landscape')
fig.show()