In [120]:
import csv
import requests
import urllib

from collections import Counter
from functools import reduce
from math import exp
from random import random, shuffle
from typing import List, Callable, Optional, TypeVar, Tuple

exec('from __future__ import annotations')

# Utility functions

In [3]:
def dot_product(xs: List[float], ys: List[float]) -> float:
    return sum(x * y for x, y in zip(xs, ys))

def sigmoid(x: float) -> float:
    return 1.0 / (1.0 + exp(-x))

def derivative_sigmoid(x: float) -> float:
    sig: float = sigmoid(x)
    return sig * (1 - sig)

def normalize(dataset: List[List[float]]) -> None:
    """
    Normalize by feature scaling, each columns is scaled between 0 and 1.
    Assumes all rows have equal length.
    """
    for col_num in range(len(dataset[0])):
        column: List[float] = [row[col_num] for row in dataset]
        maxi = max(column)
        mini = min(column)
        for row_num in range(len(dataset)):
            dataset[row_num][col_num] = (dataset[row_num][col_num] - mini) / (maxi - mini)

# Neural Network Components

In [4]:
class Neuron:
    def __init__(
        self,
        weights: List[float],
        learning_rate: float,
        activation_function: Callable[[float], float],
        derivative_activation: Callable[[float], float]
    ):
        self.weights = weights
        self.learning_rate = learning_rate
        self.activation_function = activation_function
        self.derivative_activation = derivative_activation
        self.output_cache: float = 0.0
        self.delta: float = 0.0
            
    def output(self, inputs: List[float]) -> float:
        self.output_cache = dot_product(inputs, self.weights)
        return self.activation_function(self.output_cache)

In [5]:
class Layer:
    def __init__(
        self,
        previous_layer,
        num_neurons: int,
        learning_rate: float,
        activation_function: Callable[[float], float],
        derivative_activation: Callable[[float], float]
    ):
        self.previous_layer = previous_layer
        self.neurons: List[Neuron] = []
        for i in range(num_neurons):
            if not previous_layer:
                random_weights: List[float] = []
            else:
                random_weights = [random() for _ in range(len(previous_layer.neurons))]
            neuron = Neuron(random_weights, learning_rate, activation_function, derivative_activation)
            self.neurons.append(neuron)
        self.output_cache: List[float] = [0.0 for _ in range(num_neurons)]
            
    def outputs(self, inputs: List[float]) -> List[float]:
        if not self.previous_layer:
            self.output_cache = inputs
        else:
            self.output_cache = [n.output(inputs) for n in self.neurons]
        return self.output_cache
    
    def calculate_deltas_output(self, expected: List[float]) -> None:
        for n in range(len(self.neurons)):
            derivative = self.neurons[n].derivative_activation(self.neurons[n].output_cache)
            error = expected[n] - self.output_cache[n]
            self.neurons[n].delta = derivative * error
    
    def calculate_deltas_hidden(self, next_layer) -> None:
        for index, neuron in enumerate(self.neurons):
            next_weights: List[float] = [n.weights[index] for n in next_layer.neurons]
            next_deltas: List[float] = [n.delta for n in next_layer.neurons]
            sum_weights_deltas: float = dot_product(next_weights, next_deltas)
            neuron.delta = neuron.derivative_activation(neuron.output_cache) * sum_weights_deltas
            

In [127]:
T = TypeVar('T')  # output type of interpretation of neural network

class Network:
    def __init__(
        self,
        layer_structure: List[int],
        learning_rate: float,
        activation_function: Callable[[float], float]=sigmoid,
        derivative_activation: Callable[[float], float]=derivative_sigmoid
    ):
        if len(layer_structure) < 3:
            raise ValueError(f'The network should contain >= 3 layers, got {len(layer_structure)}')
        self.layers: List[Layer] = []
        input_layer: Layer = Layer(
            None,
            layer_structure[0],
            learning_rate,
            activation_function,
            derivative_activation)
        self.layers.append(input_layer)
        for previous, num_neurons in enumerate(layer_structure[1::]):
            next_layer = Layer(
                self.layers[previous],
                num_neurons,
                learning_rate,
                activation_function,
                derivative_activation)
            self.layers.append(next_layer)
            
    def outputs(self, inputs: List[float]) -> List[float]:
        """Pushes input to the first layer, then output of the first layer as input to the second, etc."""
        return reduce(lambda inp, layer: layer.outputs(inp), self.layers, inputs)
    
    def backpropagate(self, expected: List[float]) -> None:
        # calculate deltas for output neurons
        last_layer: int = len(self.layers) - 1
        self.layers[last_layer].calculate_deltas_output(expected)
        
        # calcuate deltas for hidden layer
        for l in range(last_layer - 1, 0, -1):
            self.layers[l].calculate_deltas_hidden(self.layers[l + 1])
            
    def update_weights(self) -> None:
        """
        Update weights using deltas calculated by backpropagate().
        """
        for layer in self.layers[1:]:  # skip input layer
            for neuron in layer.neurons:
                for w in range(len(neuron.weights)):
                    neuron.weights[w] = neuron.weights[w] +\
                        neuron.learning_rate * layer.previous_layer.output_cache[w] * neuron.delta
                    
    def train(self, inputs: List[List[float]], expecteds: List[List[float]]) -> None:
        for locations, xs in enumerate(inputs):
            ys: List[float] = expecteds[locations]
            outs: List[float] = self.outputs(xs)
            self.backpropagate(ys)
            self.update_weights()
            
    def validate(
        self,
        inputs: List[List[float]],
        expected: List[T],
        interpret_output: Callable[[List[float]], T]
    ) -> Tuple[int, int, float]:

        correct = 0

        for inp, exp in zip(inputs, expected):
            result: T = interpret_output(self.outputs(inp))
            if result == exp:
                correct += 1

        percentage: float = correct / len(inputs)
        return correct, len(inputs), percentage

# Prediction of flower species using the iris dataset

In [7]:
# download the iris dataset
# response = requests.get('http://raw.githubusercontent.com/davecom/ClassicComputerScienceProblemsInPython/master/Chapter7/iris.csv')

# write the data to disk
# with open('iris_dataset.csv', 'w') as f:
#     f.write(response.text)

In [8]:
iris_parameters: List[List[float]] = []
iris_classifications: List[List[float]] = []
iris_species: List[str] = []

with open('iris_dataset.csv') as f:
    irises: List = list(csv.reader(f))
        
shuffle(irises)
for iris in irises:
    parameters: List[float] = list(map(float, iris[:4]))
    iris_parameters.append(parameters)
    species: str = iris[4]
    iris_species.append(species)
    if species == 'Iris-setosa':
        iris_classifications.append([1.0, 0.0, 0.0])
    elif species == 'Iris-versicolor':
        iris_classifications.append([0.0, 1.0, 0.0])
    else:
        iris_classifications.append([0.0, 0.0, 1.0])

normalize(iris_parameters)

In [9]:
def iris_interpret_output(output: List[float]) -> str:
    if max(output) == output[0]:
        return 'Iris-setosa'
    elif max(output) == output[1]:
        return 'Iris-versicolor'
    else:
        return 'Iris-virginica'

In [10]:
iris_network: Network = Network([4, 6, 3], 0.3)

iris_trainers: List[List[float]] = iris_parameters[:140]
iris_correct: List[List[float]] = iris_classifications[:140]
for _ in range(50):
    iris_network.train(iris_trainers, iris_correct)

In [11]:
iris_testers: List[List[float]] = iris_parameters[140:150]
iris_testers_correct: List[List[float]] = iris_species[140:150]
iris_results = iris_network.validate(iris_testers, iris_testers_correct, iris_interpret_output)
print(f"{iris_results[0]} correct of {iris_results[1]} = {iris_results[2] * 100:.2f}")

10 correct of 10 = 100.00


# Prediction of wine

In [19]:
# download dataset
# response = urllib.request.urlopen('http://archive.ics.uci.edu/ml/machine-learning-databases/wine/wine.data')
# with open('wine.csv', 'w') as f:
#     f.write(response.read().decode())

In [27]:
def wine_interpret_output(output: List[float]) -> int:
    if max(output) == output[0]:
        return 1
    elif max(output) == output[1]:
        return 2
    else:
        return 3

In [25]:
with open('wine.csv') as f:
    wine_data = list(csv.reader(f))

In [128]:
shuffle(wine_data)

wine_parameters: List[List[float]] = []
wine_classifications: List[List[float]] = []
wine_type: List[int] = []
    
for row in wine_data:
    wine_parameters.append(list(map(float, row[1:])))
    if row[0] == '1':
        wine_classifications.append([1.0, 0.0, 0.0])
    elif row[0] == '2':
        wine_classifications.append([0.0, 1.0, 0.0])
    else:
        wine_classifications.append([0.0, 0.0, 1.0])
    wine_type.append(int(row[0]))
    
normalize(wine_parameters)
    
wine_network: Network = Network([13, 13, 3], learning_rate=0.9)
    
wine_trainers: List[List[float]] = wine_parameters[:150]
wine_trainers_correct: List[List[float]] = wine_classifications[:150]
for _ in range(50):
    wine_network.train(wine_trainers, wine_trainers_correct)
    
wine_testers: List[List[float]] = wine_parameters[150:]
wine_testers_correct: List[List[float]] = wine_type[150:]
wine_results = wine_network.validate(wine_testers, wine_testers_correct, wine_interpret_output)
print(f"{wine_results[0]} correct out of {wine_results[1]} = {wine_results[2] * 100:.2f}%")

28 correct out of 28 = 100.00%
