In [119]:
import random
import math
import numpy as np
from skfuzzy import control as ctrl
from enum import Enum
from sklearn.metrics import mean_squared_error, mean_absolute_error, max_error
import skfuzzy as fuzz

In [120]:
N_BRANCHES = 10
ANTS_N = 15
TERMS_N = 5
ITER_N = 10

In [121]:
class Bounds:
    def __init__(self, left, right):
        self.left = left
        self.right = right

X_BOUNDS = Bounds(-5., 5.)
Y_BOUNDS = Bounds(-5., 5.)
Z_BOUNDS = Bounds(-1., 1.)

PI = 3.1415926535897932384626433832795

def F(x, y):
    return np.sin(2*x/PI) * np.sin(2*y/PI)

In [122]:
class GraphType(Enum):
    MIDDLE = "middle"
    LEFT = "left"
    RIGHT = "right"

class GraphNode:
    v = 0
    desire = 0.1
    next = {}
    def __init__(self, v, next):
        self.v = v
        self.next = next

class Graph:
    v = {}
    def __init__(self, n_branches, graph_type):
        v0 = {}
        rng = np.linspace(0, 1, n_branches)
        if graph_type == GraphType.LEFT:
            v2 = {}
            for i in rng:
                v2[i] = GraphNode(i, {})
            v1 = {0: GraphNode(0, v2)}
            v0 = {0: GraphNode(0, v1)}
            self.v = v0
        elif graph_type == GraphType.RIGHT:
            i = n_branches-1
            v2 = {i: GraphNode(i, {})}
            v1 = {i: GraphNode(i, v2)}
            v0 = {}
            for i in rng:
                v0[i] = GraphNode(i, v1)
            self.v = v0
        elif graph_type == GraphType.MIDDLE:
            v0 = {}
            for i0 in rng:
                v1 = {}
                for i1 in rng:
                    v2 = {}
                    for i2 in rng:
                        v2[i2] = GraphNode(i2, {})
                    v1[i1] = GraphNode(i1, v2)
                v0[i0] = GraphNode(i0, v1)
            self.v = v0
        else:
            raise ValueError("unknown graph type")

    def choose_point(m):
        nodes_arr = []
        probabilities_arr = []
        for k, v in m.items():
            nodes_arr.append(k)
            probabilities_arr.append(v.desire)
        p = random.choices(nodes_arr, weights=probabilities_arr)
        return p[0]

    def choose_path(self):
        p0 = Graph.choose_point(self.v)
        p1 = Graph.choose_point(self.v[p0].next)
        p2 = Graph.choose_point(self.v[p0].next[p1].next)
        return (p0, p1, p2)

    # (p0, p1, p2) -> weight
    def update_desire(self, paths_weights):
        for k0, v0 in self.v.items():
            for path, weight in paths_weights.items():
                if path[0] != k0:
                    continue
                v0.desire += weight
            for k1, v1 in v0.next.items():
                for path, weight in paths_weights.items():
                    if path[1] != k1:
                        continue
                    v1.desire += weight
                for k2, v2 in v1.next.items():
                    for path, weight in paths_weights.items():
                        if path[2] != k2:
                            continue
                        v2.desire += weight


In [123]:
def fitness(x_abcs, y_abcs, z_abcs):
    step = 0.25

    x_universe = np.arange(X_BOUNDS.left, X_BOUNDS.right+2*step, step)
    y_universe = np.arange(Y_BOUNDS.left, Y_BOUNDS.right+2*step, step)
    z_universe = np.arange(Z_BOUNDS.left, Z_BOUNDS.right+2*step, step)

    xv = ctrl.Antecedent(x_universe, 'x')
    yv = ctrl.Antecedent(y_universe, 'y')
    zv = ctrl.Consequent(z_universe, 'z')

    for i in range(len(x_abcs)):
        xv[f"{i+1}"] = fuzz.trimf(xv.universe, x_abcs[i])
        yv[f"{i+1}"] = fuzz.trimf(yv.universe, y_abcs[i])
        zv[f"{i+1}"] = fuzz.trimf(zv.universe, z_abcs[i])

    rules = generate_rules(x_abcs, y_abcs, xv, yv, zv)

    system = ctrl.ControlSystem(rules=rules)
    sim = ctrl.ControlSystemSimulation(system)

    obs_n = 10
    err_metric = simulation_error_metric(sim, obs_n)
    return err_metric

def generate_rules(x_abcs, y_abcs, xv, yv, zv):
    rules_dict = {}
    for x_abc in x_abcs:
        x_value = x_abc[1]
        x_term = find_term(xv.universe, xv, x_value)
        for y_abc in y_abcs:
            y_value = y_abc[1]
            y_term = find_term(yv.universe, yv, y_value)
            z_value = F(x_value, y_value)
            z_term = find_term(zv.universe, zv, z_value)
            rules_dict[(x_term, y_term)] = z_term
    rules = []
    for x_term, y_term in rules_dict:
        z_term = rules_dict[(x_term, y_term)]
        rule = ctrl.Rule(antecedent=(xv[x_term] & yv[y_term]),
                        consequent=zv[z_term])
        rules.append(rule)
    return rules

def find_term(universe, antecedent, v):
    max_deg = 0
    max_term = ''
    for term in antecedent.terms:
        deg = fuzz.fuzzymath.interp_membership(
            universe, 
            antecedent[term].mf, 
            v
        )
        if deg <= max_deg:
            continue
        max_deg = deg
        max_term = term
    return max_term

def simulation_error_metric(sim, obs_n):
    observations = []
    obs_zs = []
    real_zs = []
    for xi in np.linspace(X_BOUNDS.left, X_BOUNDS.right, obs_n):
        for yi in np.linspace(Y_BOUNDS.left, Y_BOUNDS.right, obs_n):
            sim.input['x'] = xi
            sim.input['y'] = yi
            sim.compute()
            obs_z = sim.output['z']
            real_z = F(xi, yi)
            observations.append([xi, yi, obs_z])
            obs_zs.append(obs_z)
            real_zs.append(real_z)
    mse = mean_squared_error(real_zs, obs_zs)
    return mse

In [124]:
def denormalize(x, a, b):
    return x*(b-a)+a

def path_to_abcs(path, a, b):
    points = np.linspace(a, b, TERMS_N)
    s = (points[1] - points[0]) * 0.45
    abcs = [
        [points[0], points[0], points[1]],
    ]
    for i in range(1, TERMS_N-1):
        left = denormalize(path[0], points[i-1], points[i-1]+s)
        middle = denormalize(path[1], points[i]-s, points[i]+s)
        right = denormalize(path[2], points[i+1]-s, points[i+1])
        abcs.append([left, middle, right])
    abcs.append([points[-2], points[-1], points[-1]])
    return abcs


In [125]:
g = Graph(N_BRANCHES, GraphType.MIDDLE)
for i in range(ITER_N):
    paths = {}
    for _ in range(ANTS_N):
        path = g.choose_path()
        if not path in paths:
            paths[path] = 1
        else:
            paths[path] += 1

    paths_fitness = {}
    paths_weights = {}
    best_path = None
    for path, count in paths.items():
        x_abcs = path_to_abcs(path, X_BOUNDS.left, X_BOUNDS.right)
        y_abcs = path_to_abcs(path, Y_BOUNDS.left, Y_BOUNDS.right)
        z_abcs = path_to_abcs(path, Z_BOUNDS.left, Z_BOUNDS.right)
        f = fitness(x_abcs, y_abcs, z_abcs)
        paths_weights[path] = count/f
        paths_fitness[path] = f
        if best_path == None or f < best_path[1]:
            best_path = (path, f)
    print(f"#{i} best_path = {best_path[0]}, best_fitness = {best_path[1]}")
    g.update_desire(paths_weights)
    
    

#0 best_path = (0.6666666666666666, 0.3333333333333333, 0.8888888888888888), best_fitness = 0.04645188723434462
#1 best_path = (0.2222222222222222, 0.4444444444444444, 1.0), best_fitness = 0.041367924985850735
#2 best_path = (0.3333333333333333, 0.4444444444444444, 0.8888888888888888), best_fitness = 0.04222224026886114
#3 best_path = (0.0, 0.4444444444444444, 0.8888888888888888), best_fitness = 0.04116805604454405
#4 best_path = (0.3333333333333333, 0.4444444444444444, 1.0), best_fitness = 0.04256622941776708
#5 best_path = (0.5555555555555556, 0.3333333333333333, 0.8888888888888888), best_fitness = 0.04417594151785381
#6 best_path = (0.5555555555555556, 0.4444444444444444, 1.0), best_fitness = 0.046121830415184376
#7 best_path = (0.3333333333333333, 0.4444444444444444, 0.4444444444444444), best_fitness = 0.04627250466385882
#8 best_path = (0.5555555555555556, 0.4444444444444444, 0.8888888888888888), best_fitness = 0.045639334071940894
#9 best_path = (0.5555555555555556, 0.33333333333