# Mustererkennung – Aufgabenblatt 9

Bei Fragen: [florian.hartmann@fu-berlin.de](mailto:florian.hartmann@fu-berlin.de?subject=[Mustererkennung]) – E-Mail-Titel der mit [ME] oder [Mustererkennung] anfängt

In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

In [2]:
import seaborn as sns
sns.set(style="darkgrid")
sns.set(font_scale=1.7)

## Daten laden

Wenn man die auf dem Aufgabenblatt verlinkte pickle-Datei benutzt, kann man die Daten so laden:

In [3]:
import pickle
import gzip
import os

In [4]:
data_path = "D:\\git\\git-meml\\09_exercise\\datasets\\digits-large\\"

In [5]:
 with gzip.open(os.path.join(data_path, "mnist.pkl.gz"), 'rb') as f:
    (X_train, y_train), (X_val, y_val), (X_test, y_test) = pickle.load(f, encoding='iso-8859-1')


FileNotFoundError: [Errno 2] No such file or directory: 'D:\\git\\git-meml\\09_exercise\\datasets\\digits-large\\/mnist.pkl.gz'

Die Daten sind bereits ausreichend normalisiert.

## Allgemeine Klasse für Klassifizierer

In [1]:
class Classifier:
    def score(self, X, y):
        predictions = self.predict(X)
        return np.mean(predictions == y)
    
    def confusion_matrix(self, X, y):
        size = len(set(y))
        predicted = self.predict(X)
        
        results = np.zeros((size, size), dtype=np.int32)

        for pi, yi in zip(predicted, y):
            results[int(pi)][int(yi)] += 1

        return results

## Hilfsfunktionen

Als Aktivierungsfunktion benutzen wir zunächst die logistische Funktion. Später im Semester werden wir noch ReLUs (rectified linear units) sehen.

In [2]:
def sigmoid(x):
    return 1 / (1 + np.exp(-x))

Um die Bias-Einheiten einfacher zu handhaben, werden zwei Hilfsfunktionen benötigt.

In [3]:
def add_ones(X):
    if len(X.shape) == 1:
        return np.c_[[1], [X]][0]
    else:
        return np.c_[np.ones(len(X)), X]

In [4]:
def without_ones(X):
    return X[1:, :]

Da das Modell zwischen 10 Klassen unterscheiden soll, die keine natürliche Ordnung haben, müssen wir die Ausgabe als Vektor darstellen.
In der Statistik nennt man solche Variablen (labels) kategorisch.

In [5]:
def encode_onehot(y):
    output_dimension = len(set(y))
    t = np.zeros((len(y), output_dimension))

    for i, yi in enumerate(y):
        t[i][yi] = 1.

    return t

Alternativ geht es mit numpy kürzer aber weniger verständlich:

In [6]:
def encode_onehot(y):
    output_dimension = len(set(y))
    return np.eye(output_dimension)[y]

## Gradient Descent

Es gibt viele verschiedene Optimierungsalgorithmen, die die Aktualisierungsregel anpassen. Um diese einfach austauschen zu können, abstrahieren dies die meisten ML-Bibliotheken etwas. Eine eigene Klasse sieht vielleicht zunächst übertrieben aus, es macht aber viel Sinn, wenn man etwas kompliziertere Aktualisierungsregeln betrachtet (z.B. RProp).

In [7]:
class GradientDescent:
    def __init__(self, learning_rate):
        self.learning_rate = learning_rate
        
    def __call__(self, weights, gradients, _):
        return weights - self.learning_rate * gradients

Wie beim Aufgabenblatt zur logistischen Regression gesehen, kann es Sinn machen die Lernrate dynamisch sinken zu lassen.

In [8]:
class DecayedGradientDescent:
    def __init__(self, learning_rate, decay):
        self.learning_rate = learning_rate
        self.decay = decay
        self.last_epoch = -1
        
    def __call__(self, weights, gradients, epoch):
        result = weights - self.learning_rate * gradients
        
        if epoch != self.last_epoch:
            self.learning_rate = self.learning_rate * 1 / (1 + self.decay * epoch)
            self.last_epoch = epoch
            
        return result

## Neuronale Netze

In [9]:
from operator import itemgetter
from random import sample
from time import time

Die folgende Klasse implementiert neuronale Netze mit mini-batch gradient descent. In jeder Iteration werden dabei `batch_size` viele zufällige Datenpunkte ausgesucht und durch das Netz geschickt. Bei `batch_size=1` entspricht das stochastic gradient descent (SGD), bei `batch_size=len(X)` wäre es einfach normaler batch gradient descent.

Nach jeder Epoche wird die Klassifikationsgenauigkeit ausgegeben. Man unterscheidet oft zwischen Epochen und Iterationen:
- Epochen geben an wie oft man durch den gesamten Trainingsdatensatz gegangen ist
- Iterationen sind abhängig vom konkreten Algorithmus (SGD, mini-batch -, batch gradient descent) und bezeichnen einfach die Durchläufe der jeweiligen Schleife

In [10]:
class NeuralNetwork(Classifier):
    def __init__(self, hidden):
        self.hidden = hidden

    def fit(self, X, y, optimizer, epochs=10, test=None, batch_size=128):
        _, input_dimension = X.shape
        output_dimension = len(np.unique(y))
        self.output_dimension = output_dimension
                
        t = encode_onehot(y)
        self._init_weights(input_dimension, output_dimension)
                
        batch_size = min(len(X), batch_size)
        data = zip(X, t)
        
        train_history = []
        test_history = []
        
        best_test = best_iteration = 0
        
        for i in range(epochs):
            t = time()
            
            for _ in range(int(len(X) / batch_size)):
                for Xi, ti in sample(data, batch_size):
                    Os = self._feed_forward(Xi)
                    gradients = self._back_propagate(Os, ti)
                    self.W = optimizer(self.W, gradients, i)

            score = self.score(X, y)
            train_history.append(score)
            output = "Epoch %d/%d: %ds – training %.5f" % (i + 1, epochs, time() - t, score)
            
            if test is not None:
                score = self.score(*test)
                test_history.append(score)
                output += " – validation %.5f" % score
                
                if score > best_test:
                    best_test = score
                    best_iteration = i
                
            print(output)
        
        if test is not None:
            print("=> Best validation accuracy was %.5f at iteration %d" % (best_test, best_iteration + 1))
                    
        return train_history, test_history
    
    def _init_weights(self, input_dimension, output_dimension):
        self.W = []
        previous_dimension = input_dimension
        
        for layer in self.hidden + [output_dimension]:
            self.W.append(self._init_layer(previous_dimension, layer))
            previous_dimension = layer
        
    def _init_layer(self, input_dim, output_dim):
        return np.random.random((input_dim + 1, output_dim)) - 0.5
    
    def _feed_forward(self, X):
        Os = [X]
        O_last = X
        
        for Wi in self.W:
            O_hat = add_ones(O_last)
            O_last = sigmoid(O_hat.dot(Wi))
            Os.append(O_last)
            
        return Os
    
    def _back_propagate(self, Os, ti):
        gradients = []

        # Output layer
        O_last = Os[-1]
        O_last_prev = Os[-2]
        
        e = O_last - ti
        D = np.diag(O_last * (1 - O_last))
        delta = D.dot(e)
        
        gradients.append(np.outer(delta, add_ones(O_last_prev)).T)

        # Hidden layers
        for i in reversed(range(1, len(self.hidden) + 1)):
            O = Os[i]
            D = np.diag(O * (1 - O))

            delta = D.dot(without_ones(self.W[i])).dot(delta)
            gradients.append(np.outer(delta, add_ones(Os[i - 1])).T)
            
        return np.array(gradients)[::-1]
    
    def predict(self, X):
        O_last = self._feed_forward(X)[-1]
        return O_last.argmax(axis=1)


## Visualisierung

In [11]:
def plot(history, start=1):
    acc = history[0][start:]
    val_acc = history[1][start:]
    
    x = range(len(acc))
    
    fig = plt.figure(figsize=(25, 20))
    plt.subplot(2, 1, 1)
    
    plt.plot(x, acc, label='Training Accuracy')
    plt.plot(x, val_acc, label='Validation Accuracy')
    plt.axvline(x=np.argmax(val_acc) + start, color="grey", alpha=.6, label="Best Validation", linestyle="--")
    
    plt.legend(loc='lower right')
    plt.xlabel("epochs")
    plt.ylabel("accuracy")
    
    plt.xlim(1)
    plt.show()

## Aufgabe 1: MNIST

### 80 Neuronen, 0.2 Lernrate

In [17]:
clf = NeuralNetwork(hidden=[80])
history = clf.fit(X_train, y_train, optimizer=GradientDescent(learning_rate=.2), epochs=20, test=(X_val, y_val))
clf.score(X_test, y_test)

NameError: name 'X_train' is not defined

### 80 Neuronen, 0.5 Lernrate

In [18]:
clf = NeuralNetwork(hidden=[80])
history = clf.fit(X_train, y_train, optimizer=GradientDescent(learning_rate=.5), epochs=20, test=(X_val, y_val))
clf.score(X_test, y_test)

NameError: name 'X_train' is not defined

Wenn man lange genug trainiert und genug Neuronen hat, geht die Trainingsgenauigkeit oft sehr nah an 100% heran. Die Testgenauigkeit verbessert sich aber irgendwann nicht mehr, da die Trainingsdaten zu genau gelernt werden. Im schlimmsten Fall wird die Testgenauigkeit dann sogar wieder schlechter.

Eine beliebte Strategie dagegen ist *early stopping*: Man betrachtet die Genauigkeit einer weiteren nicht zum Training benutzter Menge (der Validierungsmenge). Sobald sich diese Genauigkeit nicht mehr verbessert, bricht man ab und nimmt das Modell mit der besten Validierungsgenauigkeit.

In [19]:
plot(history)

NameError: name 'history' is not defined

### 70, 50 Neuronen, 0.4 Lernrate

In [20]:
clf = NeuralNetwork(hidden=[70, 50])
history = clf.fit(X_train, y_train, optimizer=GradientDescent(learning_rate=.4), epochs=20, test=(X_val, y_val))
clf.score(X_test, y_test)

NameError: name 'X_train' is not defined

### 70, 50 Neuronen, 0.2 Lernrate

In [21]:
clf = NeuralNetwork(hidden=[70, 50])
history = clf.fit(X_train, y_train, optimizer=GradientDescent(learning_rate=.2), epochs=20, test=(X_val, y_val))
clf.score(X_test, y_test)

NameError: name 'X_train' is not defined

## Aufgabe 2: Logische Funktionen

In [12]:
import numpy as np

X_bin = np.array([[0, 0], [0, 1], [1, 0], [1, 1]])

y_and = [a & b for a, b in X_bin]
y_or = [a | b for a, b in X_bin]
y_xor = [a ^ b for a, b in X_bin]

Bei or und and brauchen wir keine hidden layers, da die Datensätze perfekt linear separierbar sind.

In [13]:
clf = NeuralNetwork(hidden=[])
clf.fit(X_bin, y_or, epochs=5, optimizer=GradientDescent(learning_rate=5.))
clf.score(X_bin, y_or)

TypeError: Population must be a sequence or set.  For dicts, use list(d).

In [24]:
clf = NeuralNetwork(hidden=[])
clf.fit(X_bin, y_and, epochs=5, optimizer=GradientDescent(learning_rate=5.))
clf.score(X_bin, y_and)

TypeError: Population must be a sequence or set.  For dicts, use list(d).

Bei xor benötigen wir auch Neuronen in hidden layers. Dafür können wir hier nun auch 100% erreichen.

In [None]:
clf = NeuralNetwork(hidden=[10])
clf.fit(X_bin, y_xor, epochs=1000, optimizer=GradientDescent(learning_rate=.5))
clf.score(X_bin, y_xor)