In [48]:
%%capture
!pip uninstall pytorch

In [49]:
%%capture
!git clone git@github.com:rtqichen/torchdiffeq.git

In [50]:
%%capture
!pip install -e torchdiffeq

In [51]:
%%capture
!pip install tqdm

In [None]:
import os
import argparse
import time
import numpy as np

import torch
import torch.nn as nn
import torch.optim as optim

In [None]:
# %load ctrnn/datagen.py
import random
import numpy as np

khz = 1000

def random_size():
    return random.randint(5,10)

def random_times(freq):
    sample_rate = 44.1*khz
    period = 2*np.pi*1.0/freq
    times = np.arange(0, period, 1.0/sample_rate)
    return times

def random_phase():
    return np.pi*random.random()

def random_noise():
    return abs(0.3*random.random())

def random_freq(exclude):
    l,h = exclude
    while True:
        freq = 20.0*khz*random.random()
        if not (freq >= l and freq <= h):
            return freq

def sine(inputs,freq,ampl,phase):
    return ampl*np.sin(freq*np.array(inputs)+phase)

def uniform(inputs,value):
    return np.array([value]*len(inputs))

def random_wakeme():
    freq = 8.0*khz
    times = random_times(10*khz)
    noise = np.random.normal(0,0.3,len(times))
    signal = uniform(times,1)
    data = signal
    cat = Category.WAKEME
    return (data,cat)

def random_ignore():
    times = random_times(10*khz)
    noise = np.random.normal(0,0.3,len(times))
    signal = uniform(times,0.0)
    data = signal
    cat = Category.IGNORE
    return (data,cat)

def random_example():
    idx = random.randint(0,1)
    cat = Category.from_code(idx)
    if cat == Category.WAKEME:
        return random_wakeme()
    else:
        return random_ignore()


In [None]:
# %load ctrnn/category.py
class Category(Enum):
    WAKEME = "WAKEME"
    IGNORE = "IGNORE"
    UNKNOWN = "UNKNOWN"
    def code(self):
        if self == Category.WAKEME:
            return 0
        else:
            return 1
        
    @staticmethod
    def from_code(code):
        mapping = {0:Category.WAKEME, 1:Category.IGNORE}
        return mapping[code]
    
    @staticmethod
    def series():
        return [Category.WAKEME, Category.IGNORE]



In [None]:
def gen_dataset(npos=15,nneg=15):
    dataset = []
    dataset += list(map(lambda _: random_wakeme(), range(npos)))
    dataset += list(map(lambda _: random_ignore(), range(nneg)))
    return dataset

def evaluate_genomes(genomes, config, dataset, evaluate, fitness, thresh):
    for genome_id, genome in genomes:
        net = neat.nn.FeedForwardNetwork.create(genome, config)
        
        genome.fitness = 0.0
        for sig,cat in dataset:
            avg_cat = evaluate(net,sig,cat)
            fitval = fitness(avg_cat,cat,thresh)
            if fitval > 0:
                genome.fitness += 1.0/len(dataset)
            
          
        
def test_network(winner,config, evaluate, classify,thresh):
    winner_net = neat.nn.FeedForwardNetwork.create(winner,config)
    fp = 0
    fn = 0
    tp = 0
    tn = 0
    for i in range(1000):
        (sig,cat) = random_example()
        avg_cat = evaluate(winner_net,sig,cat)
        pred_cat = classify(avg_cat,thresh)
        if pred_cat == cat:
            if cat == Category.WAKEME:
                tp += 100.0/1000
            else:
                tn += 100.0/1000
                
        else:
            if cat == Category.WAKEME:
                fn += 100.0/1000
            else:
                fp += 100.0/1000
                
    print("tp=%f tn=%f fp=%f fn=%f" % (tp,tn,fp,fn))
        

In [None]:
import neat

In [None]:
CONTENT = '''
[NEAT]
fitness_criterion     = max
fitness_threshold     = 0.7
pop_size              = 250
reset_on_extinction   = 0

[DefaultGenome]
num_inputs              = 1
num_hidden              = 1
num_outputs             = 1
initial_connection      = partial_direct 0.5
feed_forward            = True
compatibility_disjoint_coefficient    = 1.0
compatibility_weight_coefficient      = 0.6
conn_add_prob           = 0.2
conn_delete_prob        = 0.2
node_add_prob           = 0.2
node_delete_prob        = 0.2
activation_default      = sigmoid
activation_options      = sigmoid
activation_mutate_rate  = 0.0
aggregation_default     = sum
aggregation_options     = sum
aggregation_mutate_rate = 0.0
bias_init_mean          = 0.0
bias_init_stdev         = 1.0
bias_replace_rate       = 0.1
bias_mutate_rate        = 0.7
bias_mutate_power       = 0.5
bias_max_value          = 30.0
bias_min_value          = -30.0
response_init_mean      = 1.0
response_init_stdev     = 0.0
response_replace_rate   = 0.0
response_mutate_rate    = 0.0
response_mutate_power   = 0.0
response_max_value      = 30.0
response_min_value      = -30.0

weight_max_value        = 30
weight_min_value        = -30
weight_init_mean        = 0.0
weight_init_stdev       = 1.0
weight_mutate_rate      = 0.8
weight_replace_rate     = 0.1
weight_mutate_power     = 0.5
enabled_default         = True
enabled_mutate_rate     = 0.01

[DefaultSpeciesSet]
compatibility_threshold = 3.0

[DefaultStagnation]
species_fitness_func = max
max_stagnation  = 20

[DefaultReproduction]
elitism            = 2
survival_threshold = 0.2

'''

with open("config.txt", "w") as fh:
    fh.write(CONTENT)
    

In [None]:
def evaluate(net,sig,cat):
    (sig,cat) = random_example()
    cats = []
    for xi in sig:
        output = net.activate([xi])[0]
        cats.append(output)

    # normalize
    avg_cat = np.mean(cats)
    assert(avg_cat <= 1.0 and avg_cat >= -1.0)
    return avg_cat

def fitness(avg_cat,cat,thresh):
    mag = abs(abs(avg_cat)-thresh)
    
    if cat == Category.WAKEME:
        return mag if avg_cat >= thresh else -mag
    else:
        return -mag if avg_cat >= thresh else mag
    
dataset = gen_dataset(50,50)
def evaluate_tl(net,cfg):
    return evaluate_genomes(net,cfg,dataset,evaluate,fitness,0.5)

# Load configuration.
config = neat.Config(neat.DefaultGenome, neat.DefaultReproduction,
                     neat.DefaultSpeciesSet, neat.DefaultStagnation,
                     'config.txt')

print("create population")
# Create the population, which is the top-level object for a NEAT run.
p = neat.Population(config)

# Add a stdout reporter to show progress in the terminal.
p.add_reporter(neat.StdOutReporter(False))

# Run until a solution is found.
winner = p.run(evaluate_tl)


In [None]:

def classify(value,thresh):
    if value >= thresh:
        return Category.WAKEME
    elif value < thresh:
        return Category.IGNORE
    
test_network(winner,config,evaluate,classify,0.5)