<a href="https://colab.research.google.com/github/jlurgo/Aire-Firmware/blob/master/SingleNeuronClient.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import math
import matplotlib.pyplot as plt
import numpy as np
import random

def sigmoidal_activation(x):
  return 1 / (1 + math.exp(-x))

def linear_activation(x):
  return x

class StemNeuron:
  def get_activation(self):
    pass

  def get_all_synapses(self):
    return []

class Neuron(StemNeuron):
  def __init__(self, synapsis = [], activation = 'sigmoidal'):
    self.synapsis = synapsis
    self.bias = Synapsis(input=InputNeuron(activation=1))
    if activation == 'linear':
      self._activation_function = linear_activation
    else:
      self._activation_function = sigmoidal_activation

  def fully_connect_with(self, neurons):
    self.synapsis = list(map(lambda n: Synapsis(input=n), neurons)) 

  def get_all_synapses(self):
    all_syn = []
    all_syn.extend(self.synapsis)
    all_syn.append(self.bias)
    return all_syn

  def get_weighed_sum_of_inputs(self):
    return sum(map(lambda s: s.get_output(), self.synapsis), self.bias.get_output())
  
  def get_activation(self):
    return self._activation_function(self.get_weighed_sum_of_inputs())

  def __repr__(self):
    return 'Neuron: Synapses: [{}], Bias: {}'.format(self.synapsis, self.bias)

class InputNeuron(StemNeuron):
  def __init__(self, activation: float = 0):
    self._activation = activation

  def get_activation(self):
    return self._activation

  def set_activation(self, value):
    self._activation = value

  def __repr__(self):
    return 'InputNeuron: {{activation:{}}}'.format(self.get_activation())

class Synapsis:
  def __init__(self, input: StemNeuron = None, weight = None):
    self.input_neuron = input
    self.weight = weight or (random.choice([-1, 1]) * (1.2 - (random.random() * 0.4)))

  def get_output(self):
    return self.input_neuron.get_activation() * self.weight

  def __repr__(self):
    return 'weight:{}'.format(self.weight)

class Perceptron:
  def __init__(self, input_size):
    self.input_layer = [InputNeuron() for x in range(input_size)]
    self.output_neuron = Neuron()
    self.output_neuron.fully_connect_with(self.input_layer)
  
  def output_for(self, input):
    for i, val in enumerate(input):
      self.input_layer[i].set_activation(val)
    return [self.output_neuron.get_activation()]

  def get_all_synapses(self):
    return self.output_neuron.get_all_synapses()

  def __repr__(self):
    return 'Perceptron: InputLayer: {}, OutputNeuron: {}'.format(self.input_layer, self.output_neuron)

class MultilayerPerceptron:
  def __init__(self, layer_sizes, output_activation='sigmoidal'):
    self.layers = []
    for i, size in enumerate(layer_sizes):
      if i == 0:
        self.layers.append([InputNeuron() for n in range(size)])
        continue
      elif i == len(layer_sizes) - 1:
        self.layers.append([Neuron(activation=output_activation) for n in range(size)])
      else:
        self.layers.append([Neuron() for n in range(size)])
      for n in self.layers[i]:      
          n.fully_connect_with(self.layers[i - 1])        

  def output_for(self, input):
    for i, val in enumerate(input):
      self.layers[0][i].set_activation(val)
    return list(map(lambda n: n.get_activation(), self.layers[-1])) 

  def get_all_synapses(self):
    all_syn = []
    for layer in self.layers:
      for n in layer:
        all_syn.extend(n.get_all_synapses())
    return all_syn

  def get_all_weights(self):
    return list(map(lambda s: s.weight, self.get_all_synapses()))

  def copy_weights_from(self, network):
    syn = network.get_all_synapses()
    for i, s in enumerate(self.get_all_synapses()):
      s.weight = syn[i].weight

  def __repr__(self):
    return 'MultilayerPerceptron: Layers: {}'.format(self.layers)
  
class MultilayerPredictor:
  def __init__(self, layer_sizes):
    self.network = MultilayerPerceptron(layer_sizes, 'linear')

  def output_for(self, input):
    return self.network.output_for(input)

  def get_all_synapses(self):
    return self.network.get_all_synapses()

  def __repr__(self):
    return 'MultilayerPredictor: Network: {}'.format(self.network)

class StochasticNeuralNetworkCoach:
  def __init__(self, network):
    self.network = network
    self.last_global_error = 1

  def fit(self, training_set, learning_rate, error_target = 0.01, max_epochs = 10000, weight_batch_size = None):
    global_error_progress = []
    if self.get_global_error(training_set) <= error_target:
      return global_error_progress

    for i in range(max_epochs):
      global_error = self.train_one_epoch(training_set, learning_rate, weight_batch_size)      
      global_error_progress.append(global_error)
      if global_error <= error_target:
        break
    return global_error_progress

  def train_one_epoch(self, training_set, learning_rate, weight_batch_size = None):
    shuffled = list(training_set)
    random.shuffle(shuffled)
    self.last_global_error = 0
    for training_case in shuffled:
      error = self.step_training_for_single_case(training_case["input"], training_case["target"], learning_rate, weight_batch_size)
      self.last_global_error += error 
    self.last_global_error = self.last_global_error / len(training_set)
    return self.last_global_error

  def step_training_for_single_case(self, input, expected_output, learning_rate, weight_batch_size = None):
    all_synapses = self.network.get_all_synapses()
    training_synapses = random.sample(all_synapses, weight_batch_size) if weight_batch_size else all_synapses
    current_error = self.get_error_for_single_case(input, expected_output)
    error_gradient = self.get_error_gradient_for_single_case(current_error, training_synapses, input, expected_output)

    # print('gradient', error_gradient)
    for i, synapse in enumerate(training_synapses):
      synapse.weight -= error_gradient[i] * learning_rate
    
    return current_error
        
  def get_error_gradient_for_single_case(self, current_error, synapses, input, expected_output):
    return list(map(lambda s: self.get_error_derivative_for_single_weight_single_case(s, input, expected_output, current_error),
                    synapses)) 
  
  def get_error_derivative_for_single_weight_single_case(self, synapse, input, expected_output, current_error):     
    delta_w = 0.001 * current_error
    current_weight = synapse.weight
    synapse.weight += delta_w
    delta_error = self.get_error_for_single_case(input, expected_output)
    error_derivative = (delta_error - current_error) / delta_w
    synapse.weight = current_weight
    return error_derivative

  def get_global_error(self, training_set):
    return sum(map(lambda c: self.get_error_for_single_case(c["input"], c["target"]), training_set)) / len(training_set)
  
  def get_error_for_single_case(self, input, expected_output):
    network_output = self.network.output_for(input)
    
    _sum = 0
    for i, val in enumerate(network_output):
      _sum += (expected_output[i] - network_output[i]) ** 2
    return _sum / len(network_output)

class StochasticPredictorCoach():
  def __init__(self, network):
    self.predictor = network

  def train_for_input():
    pass


In [None]:
and_network = Perceptron(2)
and_coach = StochasticNeuralNetworkCoach(and_network)
and_training_set = [{"input": [0, 0], "target": [0]}, {"input": [0, 1], "target": [0]}, {"input": [1, 0], "target": [0]}, {"input": [1, 1], "target": [1]}] 
and_fig, and_ax = plt.subplots()
error_data = and_coach.fit(and_training_set, 0.5)
and_ax.plot(range(len(error_data)), error_data)
plt.show()

In [None]:
or_network = Perceptron(2)
or_coach = StochasticNeuralNetworkCoach(or_network)
or_training_set = [{"input": [0, 0], "target": [0]}, {"input": [0, 1], "target": [1]}, {"input": [1, 0], "target": [1]}, {"input": [1, 1], "target": [1]}] 
or_fig, or_ax = plt.subplots()
error_data = or_coach.fit(or_training_set, 0.5)
or_ax.plot(range(len(error_data)), error_data)
plt.show()

In [None]:
xor_network = MultilayerPerceptron([2, 2, 1])
xor_coach = StochasticNeuralNetworkCoach(xor_network)
xor_training_set = [{"input": [0, 0], "target": [0]}, {"input": [0, 1], "target": [1]}, {"input": [1, 0], "target": [1]}, {"input": [1, 1], "target": [0]}] 
xor_fig, xor_ax = plt.subplots()
error_data = xor_coach.fit(xor_training_set, 0.5)
xor_ax.plot(range(len(error_data)), error_data)
plt.show()

In [None]:
from IPython.display import display, HTML
trackpad_display = HTML('''
  <canvas id="use" style="height: 500px; width: 500px; background-color: orange" width="500" height="500"></canvas>
  <canvas id="play" style="height: 500px; width: 500px; background-color: pink" width="500" height="500"></canvas>
  <div>
    <label>Training set size:</label>
    <label id="training_set_size"></label>
  </div>
  <div>
    <label>Elapsed epochs:</label>
    <label id="epochs"></label>
  </div>
  <div>
    <label>Error:</label>
    <label id="last_error"></label>
  </div>
  <script>
    var sigmoidal_activation = (x) =>  1 / (1 + Math.exp(-x));
    var linear_activation = (x) => x;

    class Neuron {
      constructor(synapsis = [], activation = 'sigmoidal') {
        this.synapsis = synapsis;
        this.bias = new Synapsis(new InputNeuron(1));
        if (activation == 'linear') {
          this._activation_function = linear_activation;
        } else {
          this._activation_function = sigmoidal_activation;
        }
      }      
      fully_connect_with(neurons) {
        this.synapsis = neurons.map((n)=> new Synapsis(n))
      }
      get_all_synapses(){
        var all_syn = [];
        all_syn.push(...this.synapsis, this.bias);
        return all_syn;
      }
      get_weighed_sum_of_inputs(){
        return this.synapsis
                  .map(s => s.get_output())
                  .reduce((sum, out)=> sum + out, this.bias.get_output());
      }
      get_activation(){
        return this._activation_function(this.get_weighed_sum_of_inputs());
      }
      // ()
      //   return 'Neuron: Synapses: [{}], Bias: {}'.format(this.synapsis, this.bias)
    }
    class InputNeuron {
      constructor(activation = 0) {
        this._activation = activation;
      }
      get_all_synapses() {
        return [];
      }
      get_activation(){
        return this._activation;
      }
      set_activation(value){
        this._activation = value;
      }
      // ():
      //   return 'InputNeuron: {{activation:{}}}'.format(self.get_activation())
    }
    class Synapsis {
      constructor(input = null, weight = null) {
        this.input_neuron = input;
        this.weight = weight || (1 - (2 * Math.random()));
      }
      get_output() {
        return this.input_neuron.get_activation() * this.weight;
      }
      // def __repr__():
      //   return 'weight:{}'.format(this.weight)
    }
    // class Perceptron{
    //   constructor(input_size) {
    //     this.input_layer = [new InputNeuron() for x in range(input_size)];
    //     this.output_neuron = new Neuron();
    //     this.output_neuron.fully_connect_with(this.input_layer);
    //   }
    //   output_for(input) {
    //     for i, val in enumerate(input){
    //       this.input_layer[i].set_activation(val);
    //     }
    //     return [this.output_neuron.get_activation()];
    //   }
    //   get_all_synapses() {
    //     return this.output_neuron.get_all_synapses();
    //   }
    //   // ():
    //   //   return 'Perceptron: InputLayer: {}, OutputNeuron: {}'.format(this.input_layer, self.output_neuron)
    // }
    class MultilayerPerceptron {
      constructor(layer_sizes, output_activation='sigmoidal') {
        this.layers = [];
        layer_sizes.forEach((size, i) => {
          if (i == 0) {
            this.layers.push([...Array(size).keys()].map(i => new InputNeuron()));
            return;
          } else if (i == (layer_sizes.length - 1)) {
            this.layers.push([...Array(size).keys()].map(i => new Neuron(output_activation)));
          } else {
            this.layers.push([...Array(size).keys()].map(i => new Neuron()));
          } 
          this.layers[i].forEach(n => {
            n.fully_connect_with(this.layers[i - 1]);
          });
        });
      }
      output_for(input) {
        input.forEach((val, i) => {
          this.layers[0][i].set_activation(val);
        });
        var last_layer = this.layers[this.layers.length - 1];
        return last_layer.map(n => n.get_activation());
      }
      play(initial_input, future_size) {
        var future_points = [];
        var last_output = initial_input;
        [...Array(future_size).keys()].forEach(() => {
          last_output = client_network.output_for(last_output);
          future_points.push(last_output);
        });
        return future_points;
      }
      get_all_synapses() {
        var all_syn = [];
        this.layers.forEach(layer => {
          layer.forEach(n => {
            all_syn.push(...n.get_all_synapses());
          });
        });
        return all_syn;
      }
      copy_weights_from_network(network) {
        var syn = network.get_all_synapses();
        this.get_all_synapses().forEach((s, i) => s.weight = syn[i].weight);
      }
      copy_weights(weights) {
        this.get_all_synapses().forEach((s, i) => s.weight = weights[i]);
      }
      // ():
      //   return 'MultilayerPerceptron: Layers: {}'.format(this.layers)
    }

    var client_network = new MultilayerPerceptron([2, 2]);
    const epochs_display = document.getElementById('epochs');
    const error_display = document.getElementById('last_error');
    const training_set_size_display = document.getElementById('training_set_size');

    const use_canvas = document.getElementById('use');
    const use_context = use_canvas.getContext('2d');

    var batch_collecting = false;
    var batch = [];
    var last_mouse_input = null;
    var last_mouse_input_buffer = [];
    var last_mouse_button = null;
    var next_drawing_output = [];

    var draw_point_on_canvas = (point, color, canvas, context) => {
      context.beginPath();
      context.arc(point[0] * canvas.width, point[1] * canvas.height, 5, 0, 2 * Math.PI, false);
      context.fillStyle = color;
      context.fill();
      context.lineWidth = 1;
      context.strokeStyle = '#003300';
      context.stroke();
    };

    var train_interval = null;
    var sample_interval = null;

    var send_and_receive_training_data = (e) => {
      var data_to_send = last_mouse_input_buffer.splice(0, 100);
      console.log('send to train', data_to_send.length, ' points', ', remaining', last_mouse_input_buffer.length, ' points');
      google.colab.kernel.invokeFunction('use', [data_to_send, 10], {})
      .then((response) => {
        var data = response.data['text/plain'];
        data = data.substring(1, data.length - 1);
        data = JSON.parse(data);
        epochs_display.innerHTML = data[1].trained_epochs;
        error_display.innerHTML = data[1].last_error;
        training_set_size_display.innerHTML = data[1].training_set_size;
        client_network = new MultilayerPerceptron(data[2]);
        client_network.copy_weights(data[0]);
      });
    };
    use_canvas.onmousemove = (e) => {
      var train = e.buttons === 1;
      last_mouse_input = {point:[e.offsetX/use_canvas.width, e.offsetY/use_canvas.height], train: train};
    };
    use_canvas.onmouseenter = (e) => {
      if(sample_interval) clearInterval(sample_interval);
      if(train_interval) clearInterval(train_interval);
      train_interval = setInterval(send_and_receive_training_data, 200);
      sample_interval = setInterval(() => {
        if(last_mouse_input && last_mouse_input.train){
          last_mouse_input_buffer.push({...last_mouse_input});
        }
        if(last_mouse_input) {
          use_context.clearRect(0, 0, use_canvas.width, use_canvas.height);         
          draw_point_on_canvas(last_mouse_input.point,
                              last_mouse_input.train ? 'red':'blue', 
                              use_canvas, use_context);
          client_network.play(last_mouse_input.point, 1).forEach((p) => {
            draw_point_on_canvas(p,'green', use_canvas, use_context);
          });
        }
        last_mouse_input.train = false;
      }, 100);
    };

    /////////////////////// PLAY
    var next_drawing_output_play = [];
    const play_canvas = document.getElementById('play');
    const play_context = play_canvas.getContext('2d');
    var last_point = null;
    var play_sample_interval = null;

    play_canvas.onclick = (e) => {
      last_point = [e.offsetX/play_canvas.width, e.offsetY/play_canvas.height];     
      if (play_sample_interval) clearInterval(play_sample_interval);    
      play_sample_interval = setInterval(() => {
        last_point = client_network.output_for(last_point);        
        play_context.clearRect(0, 0, play_canvas.width, play_canvas.height);         
        draw_point_on_canvas(last_point, 'green', play_canvas, play_context); 
      }, 100);  
    }  
  </script>
''')

In [None]:
import IPython
import uuid
import json 
from threading import Timer
import time
from IPython.display import display, HTML
from google.colab import output

class TrackPad(object):
  def _repr_html_(self):
    output.register_callback("use", self.use_network)
    return trackpad_display.data

  def __init__(self, network_schema, initial_training_set=[]):
    self.training_network = MultilayerPerceptron(network_schema)
    self.predictor_coach = StochasticNeuralNetworkCoach(self.training_network)   
    self.training_set = initial_training_set
    self.network_schema = network_schema
    self.last_error_data = []
    self.train_timer = None
    self.train_network()

  def train_network(self):
    try:
      if len(self.training_set) > 0:
        self.last_error_data += self.predictor_coach.fit(self.training_set, 0.2, error_target = 0.001, max_epochs = 20)     
    except Exception:
      print('error')
    finally:
      self.train_timer = Timer(0.01, self.train_network)
      self.train_timer.start() 

  def add_training_data(self, input_points):
    prediction_distance = 1
    for index, input in enumerate(input_points):
      if index < (len(input_points) - prediction_distance):
        if input['train']:          
          self.training_set.append({'input': input['point'], 'target': input_points[index + prediction_distance]['point']})

  def use_network(self, input_points, future_size=1):  
    self.add_training_data(input_points)
    error = 1
    if len(self.last_error_data) > 0:
      error = self.last_error_data[-1]
    current_weights = self.training_network.get_all_weights()
    training_stats = {"trained_epochs": len(self.last_error_data),
                      "last_error": error,
                      "training_set_size": len(self.training_set)}
    return json.dumps([current_weights, training_stats, self.network_schema])

xor_training_set = [{"input": [0, 0], "target": [0, 0]}, {"input": [0, 1], "target": [1, 1]},
                    {"input": [1, 0], "target": [1, 1]}, {"input": [1, 1], "target": [0, 0]}] 
TrackPad([2, 3, 2], xor_training_set)