In [None]:
# Импорт необходимых библиотек
import gc
import math
import random
import numpy as np
import copy
import matplotlib.pyplot as plt
import pandas as pd
import tensorflow as tf
from tensorflow.keras.layers import Dense, SimpleRNN, GRU, LSTM, Dropout, TimeDistributed, Input
from tensorflow.keras.losses import BinaryCrossentropy
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.optimizers import  SGD, RMSprop, Adam
from functools import cmp_to_key
from google.colab import drive

drive.mount('/content/gdrive')

import sys
sys.path.append('/content/gdrive/My Drive/Colab Notebooks/10_nodes_new')

# Определение графа с рёбрами, начальной и конечной вершинами
edges = [[1, 2], [1, 5], [1, 8], [2, 3], [3, 4], [3, 8], [3, 9], [4, 10], [5, 6], [6, 7], [6, 8], [6, 9], [7, 10], [9, 10]]
number_edges = len(edges)  # Количество рёбер
number_steps = 4  # Количество шагов в последовательности
start_node = 1  # Начальная вершина
end_node = 10  # Конечная вершина

# Загрузка данных из CSV файлов
df = pd.read_csv('gdrive/My Drive/Colab Notebooks/10_nodes_new/data/10_nodes_LU.csv')
x = df.iloc[:,:number_edges].values  # Признаки
y = df.iloc[:,number_edges:].values  # Метки

df_hp = pd.read_csv('gdrive/My Drive/Colab Notebooks/10_nodes_new/data/10_nodes_LU_hp.csv')
x_hp = df_hp.iloc[:,:number_edges].values  # Признаки для гиперпараметров
y_hp = df_hp.iloc[:,number_edges:].values  # Метки для гиперпараметров

# Разделение данных на обучающий, тестовый и валидационный наборы
from sklearn.model_selection import train_test_split
x_main_train, x_main_tv, y_main_train, y_main_tv = train_test_split(x, y, test_size=2/5)
x_main_test, x_main_valid, y_main_test, y_main_valid = train_test_split(x_main_tv, y_main_tv, test_size=1/2)

x_hp_train, x_hp_tv, y_hp_train, y_hp_tv = train_test_split(x_hp, y_hp, test_size=2/5)
x_hp_test, x_hp_valid, y_hp_test, y_hp_valid = train_test_split(x_hp_tv, y_hp_tv, test_size=1/2)

# Стандартизация данных
from sklearn.preprocessing import StandardScaler
sc = StandardScaler()
x_train = sc.fit_transform(x_main_train)
x_test = sc.transform(x_main_test)
x_valid = sc.transform(x_main_valid)

sc_hp = StandardScaler()
x_train_1 = sc_hp.fit_transform(x_hp_train)
x_test_1 = sc_hp.transform(x_hp_test)
x_valid_1 = sc_hp.transform(x_hp_valid)

# Расширение размерности данных для временных рядов
x_train = np.repeat(x_train[:, np.newaxis, : ], number_steps, axis=1)
x_test = np.repeat(x_test[:, np.newaxis, : ], number_steps, axis=1)
x_valid = np.repeat(x_valid[:, np.newaxis, : ], number_steps, axis=1)

x_train_1 = np.repeat(x_train_1[:, np.newaxis, : ], number_steps, axis=1)
x_test_1 = np.repeat(x_test_1[:, np.newaxis, : ], number_steps, axis=1)
x_valid_1 = np.repeat(x_valid_1[:, np.newaxis, : ], number_steps, axis=1)

# Изменение формы меток для соответствия размерности
y_train = np.reshape(y_main_train, (y_main_train.shape[0], number_steps, number_edges))
y_test = np.reshape(y_main_test, (y_main_test.shape[0], number_steps, number_edges))
y_valid = np.reshape(y_main_valid, (y_main_valid.shape[0], number_steps, number_edges))

y_train_1 = np.reshape(y_hp_train, (y_hp_train.shape[0], number_steps, number_edges))
y_test_1 = np.reshape(y_hp_test, (y_hp_test.shape[0], number_steps, number_edges))
y_valid_1 = np.reshape(y_hp_valid, (y_hp_valid.shape[0], number_steps, number_edges))

# Вывод формы данных
print(x_train.shape, y_train.shape, x_test.shape, y_test.shape, x_valid.shape, y_valid.shape, x_train_1.shape, y_train_1.shape, x_test_1.shape, y_test_1.shape)

# Определение гиперпараметров структуры и обучения
# Гиперпараметры структуры: количество слоёв, количество единиц, dropout, тип ячейки
# Гиперпараметры обучения: скорость обучения, размер батча
HP_1 = [[1, 2, 3, 4, 5],
       [-1, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120, 130, 140, 150, 160, 170, 180, 190, 200, -1],
       [0.0, 0.1, 0.2, 0.3, 0.4, 0.5],
       [SimpleRNN, GRU, LSTM],
       [SGD, RMSprop, Adam],
       [0.0001, 0.001, 0.01, 0.1],
       [16, 32, 64, 128, 256, 512, 1024]]

# Создание списка количества вариантов гиперпараметров
t = []
for p in HP_1:
    t.append(len(p))

# Инициализация вспомогательных переменных
t1 = []
hp = []
for i in range(t[0]):
    t1.append(t[1])
    t1.append(t[2])
    hp.append(HP_1[1])
    hp.append(HP_1[2])

# Добавление гиперпараметров, начиная с третьего элемента
for i in range(3, len(HP_1)):
    t1.append(t[i])
    hp.append(HP_1[i])

# Определение квадратных корней количества вариантов гиперпараметров
t2 = [int(np.sqrt(itm)) for itm in t1]

# Расчёт общего количества гиперпараметров
n_hp = 2*t[0] + len(HP_1) - 3

# Вывод информации о гиперпараметрах
print(t, t1, t2, n_hp)

# Определение пользовательской функции точности
def custom_accuracy(y_true, y_pred):
    threshold = 0.5
    y_pred_binary = tf.cast(tf.greater_equal(y_pred, threshold), tf.float32)
    y_true = tf.cast(y_true, tf.float32)
    stepwise_correct = tf.reduce_all(tf.equal(y_true, y_pred_binary), axis=-1)
    sequencewise_correct = tf.reduce_all(stepwise_correct, axis=-1)
    acc = tf.reduce_mean(tf.cast(sequencewise_correct, tf.float32))
    return acc

# Создание callback для ранней остановки при обучении модели
callback = tf.keras.callbacks.EarlyStopping(monitor='val_custom_accuracy',
                                            min_delta=0.001, patience=10, verbose=1, mode='max',
                                            restore_best_weights=True)

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).
(600000, 4, 14) (600000, 4, 14) (200000, 4, 14) (200000, 4, 14) (200000, 4, 14) (200000, 4, 14) (6000, 4, 14) (6000, 4, 14) (2000, 4, 14) (2000, 4, 14)
[5, 22, 6, 3, 3, 4, 7] [22, 6, 22, 6, 22, 6, 22, 6, 22, 6, 3, 3, 4, 7] [4, 2, 4, 2, 4, 2, 4, 2, 4, 2, 1, 1, 2, 2] 14


In [None]:
class Network:
    def __init__(self):
        # Инициализация сети: генерируем решение, инициализируем значения фитнеса и дельты
        self.gen = []  # Генерация гиперпараметров (путь)
        self.fitness = 0  # Фитнес текущего решения
        self.delta = 0  # Изменение феромона на основе решения

    def check_gen(self):
        # Проверка допустимости гиперпараметров
        state = False
        for i in range(n_hp):
            if i % 2 == 0 and i < 2 * t[0] and hp[i][self.gen[i]] != -1:
                state = True
                break
        return state

    def evaluate(self):
        # Оценка решения на основе гиперпараметров
        if self.check_gen() == False:
            return 0
        else:
            input_shape = (number_steps, number_edges)
            inputs = Input(shape=input_shape)
            x_input = inputs
            cell_type = hp[2 * t[0]][self.gen[2 * t[0]]]  # Определяем тип ячейки (SimpleRNN, GRU или LSTM)
            
            # Построение модели на основе выбранных гиперпараметров
            for i in range(t[0]):
                if hp[2 * i][self.gen[2 * i]] != -1:
                    x_input = cell_type(hp[2 * i][self.gen[2 * i]], return_sequences=True)(x_input)
                    x_input = Dropout(hp[2 * i + 1][self.gen[2 * i + 1]])(x_input)
            
            # Выходной слой с сигмоидной активацией
            outputs = TimeDistributed(Dense(number_edges, activation="sigmoid"))(x_input)
            model = Model(inputs=inputs, outputs=outputs)
            
            # Компиляция модели с выбранным оптимизатором и скоростью обучения
            opt = hp[2 * t[0] + 1][self.gen[2 * t[0] + 1]]
            model.compile(loss=BinaryCrossentropy(), optimizer=opt(learning_rate=hp[2 * t[0] + 2][self.gen[2 * t[0] + 2]]), metrics=[custom_accuracy])
            
            # Обучение модели
            model.fit(x_train_1, y_train_1, epochs=100, callbacks=[callback], validation_data=(x_valid_1, y_valid_1), batch_size=hp[2 * t[0] + 3][self.gen[2 * t[0] + 3]], verbose=0)
            
            # Оценка точности на тестовом наборе данных
            pred = model.predict(x_test_1)
            accuracy_hp = custom_accuracy(y_test_1, pred)
            accuracy_result = tf.keras.backend.get_value(accuracy_hp)
            acc = max(int(round(accuracy_result * 100)), 1)
            
            # Очистка сессии для предотвращения утечек памяти
            tf.keras.backend.clear_session()
            return acc

In [None]:
class AS:
    def __init__(self, N, Max, p, a, b, Q):
        # Инициализация алгоритма AS (Ant System)
        self.N = N  # Количество муравьёв
        self.Max = Max  # Максимальное количество итераций
        self.p = p  # Коэффициент испарения феромона
        self.a = a  # Важность феромона
        self.b = b  # Важность эвристической информации
        self.Q = Q  # Интенсивность феромона
        self.colony = [Network() for i in range(self.N)]  # Инициализация муравьёв (колонии)
        self.best = Network()  # Лучшая сеть
        self.pool = []  # Пул уже проверенных решений
        self.t0 = 1 / n_hp  # Начальная концентрация феромона
        self.pheromone = self.create_pheromone()  # Инициализация феромонов

    def colony_fitness(self):
        # Вычисление фитнеса для всей колонии
        for i in range(len(self.colony)):
            self.calculate_fitness(self.colony[i])
            self.show(self.colony[i])
        print("__________________________________________________________________")

    def calculate_fitness(self, new_mem):
        # Вычисление фитнеса для нового решения
        cond = True
        for mem in self.pool:
            if tuple(new_mem.gen) == tuple(mem.gen):
                cond = False
                new_mem.fitness = mem.fitness
                break
        if cond == True:
            new_mem.fitness = new_mem.evaluate()
            self.pool.append(copy.deepcopy(new_mem))

    def complexity(self, m):
        # Вычисление сложности модели
        coeff = None
        r_a = 0
        if m.gen[2 * t[0]] == 0:
            coeff = 1  # SimpleRNN
        elif m.gen[2 * t[0]] == 1:
            coeff = 3  # GRU
            r_a = 1
        else:
            coeff = 4  # LSTM
        a = 0
        g = number_edges
        for i in range(t[0]):
            if hp[2 * i][m.gen[2 * i]] != -1:
                b = coeff * hp[2 * i][m.gen[2 * i]] * (hp[2 * i][m.gen[2 * i]] + g + r_a + 1)
                a += b
                g = hp[2 * i][m.gen[2 * i]]
        a += number_edges * (g + 1)
        return a

    def compare(self, m1, m2):
        # Сравнение двух решений на основе их фитнеса и сложности
        if m1.fitness == m2.fitness:
            a = self.complexity(m1)
            b = self.complexity(m2)
            if a < b:
                return -1
            elif a == b:
                return 0
            else:
                return 1
        elif m1.fitness > m2.fitness:
            return -1
        else:
            return 1

    def create_pheromone(self):
        # Создание начальных значений феромона
        pheromone = copy.deepcopy(hp)
        for i in range(n_hp):
            for j in range(t1[i]):
                pheromone[i][j] = self.t0  # Инициализация феромона на начальном уровне
        return pheromone

    def create_path(self):
        # Создание путей для каждого муравья
        for i in range(self.N):
            gen = []
            for pr in range(n_hp):
                pos_pr = self.get_next_parameter(pr)  # Получение следующего параметра
                gen.append(pos_pr)
            self.colony[i].gen = copy.deepcopy(gen)
            self.calculate_fitness(self.colony[i])
            self.colony[i].delta = self.Q * self.colony[i].fitness  # Обновление дельты для феромона
            self.show(self.colony[i])
            print("__________________________________________________________________")

    def get_next_parameter(self, pr):
        # Получение следующего параметра на основе феромонов и эвристики
        total_prob = 0
        prob = []
        
        # Вычисляем вероятность для каждого значения гиперпараметра
        for i in range(t1[pr]):
            x = self.pheromone[pr][i]  # Значение феромона
            y = 1 / t1[pr]  # Эвристическая информация
            z = pow(x, self.a) * pow(y, self.b)  # Вычисление вероятности
            total_prob += z
            prob.append(z)

        # Нормализация вероятностей
        prob = [prb / total_prob for prb in prob]
        pos_pr = np.random.choice(list(range(t1[pr])), p=prob)  # Выбор на основе вероятностей
        return pos_pr

    def update_pheromone(self):
        # Обновление феромонов на основе муравьёв
        for i in range(n_hp):
            for j in range(t1[i]):
                self.pheromone[i][j] = self.pheromone[i][j] * (1 - self.p)  # Испарение феромона
        for i in range(self.N):
            for j in range(n_hp):
                j2 = self.colony[i].gen[j]
                self.pheromone[j][j2] += self.colony[i].delta  # Обновление феромона на основе дельты

    def current_colony(self):
        # Отображение текущей колонии
        for i in range(len(self.colony)):
            self.show(self.colony[i])
        print("__________________________________________________________________")

    def get_best(self):
        # Получение лучшего решения
        self.colony.sort(key=cmp_to_key(self.compare))
        self.current_colony()
        if self.compare(self.best, self.colony[0]) == 1:
            self.best = copy.deepcopy(self.colony[0])  # Обновление лучшего решения
        self.show(self.best)
        print("__________________________________________________________________")

    def show(self, m):
        # Отображение информации о решении
        p_copy = []
        for j in range(t[0]):
            p_copy.append((m.gen[2 * j], m.gen[2 * j + 1]))
        p_copy.append(m.gen[2 * t[0]:])
        print(p_copy, m.fitness)

    def do(self):
        # Основной цикл выполнения алгоритма AS
        for itr in range(self.Max):
            print("iteration = ", itr)
            print(self.pheromone[10])
            self.create_path()  # Создание путей для муравьёв
            self.update_pheromone()  # Обновление феромонов
            self.get_best()  # Получение лучшего решения
            gc.collect()  # Очистка памяти

In [None]:
# N = 10
# Max = 10
# p = 0.1
# a = 1
# b = 2
# Q = 0.01

In [None]:
# import time
# for _ in range(3):
#     start = time.time()
#     alg = AS(N, Max, p, a, b, Q)
#     alg.do()
#     with open('/content/gdrive/My Drive/Colab Notebooks/10_nodes_new/ACO/acc/AS_3.txt', 'a') as file_1:
#         file_1.write(", ".join(map(str, alg.best.gen)) + '\n')
#     end = time.time()
#     run_time = int(end - start)
#     with open('/content/gdrive/My Drive/Colab Notebooks/10_nodes_new/ACO/acc/AS_time_3.txt', 'a') as file_2:
#         file_2.write(f"{run_time}\n")

iteration =  0
[0.07142857142857142, 0.07142857142857142, 0.07142857142857142]
Restoring model weights from the end of the best epoch: 1.
Epoch 11: early stopping
[(15, 2), (8, 4), (15, 5), (2, 2), (1, 3), [0, 1, 1, 0]] 1
__________________________________________________________________
Restoring model weights from the end of the best epoch: 1.
Epoch 11: early stopping
[(13, 1), (8, 2), (13, 0), (17, 0), (5, 2), [0, 1, 3, 1]] 1
__________________________________________________________________
Restoring model weights from the end of the best epoch: 1.
Epoch 11: early stopping
[(9, 5), (4, 3), (11, 3), (17, 1), (21, 2), [1, 2, 3, 2]] 1
__________________________________________________________________
Restoring model weights from the end of the best epoch: 1.
Epoch 11: early stopping
[(5, 1), (18, 5), (16, 0), (20, 1), (2, 3), [0, 2, 3, 0]] 1
__________________________________________________________________
Restoring model weights from the end of the best epoch: 1.
Epoch 11: early sto

In [None]:
# input_shape = (number_steps, number_edges)
# inputs = Input(shape=input_shape)
# x_input = inputs
# cell_type = hp[2*t[0]][alg.best.gen[2*t[0]]]
# for i in range(t[0]):
#     if hp[2*i][alg.best.gen[2*i]] != -1:
#         x_input = cell_type(hp[2*i][alg.best.gen[2*i]], return_sequences=True)(x_input)
#         x_input = Dropout(hp[2*i+1][alg.best.gen[2*i+1]])(x_input)
# outputs = TimeDistributed(Dense(number_edges, activation="sigmoid"))(x_input)
# model = Model(inputs=inputs, outputs=outputs)
# opt = hp[2*t[0]+1][alg.best.gen[2*t[0]+1]]
# model.compile(loss=BinaryCrossentropy(), optimizer=opt(learning_rate=hp[2*t[0]+2][alg.best.gen[2*t[0]+2]]), metrics = [custom_accuracy])
# model.summary()
# print(alg.complexity(alg.best))
# print(opt)
# history = model.fit(x_train, y_train, epochs=1000, batch_size=hp[2*t[0]+3][alg.best.gen[2*t[0]+3]], callbacks=[callback], validation_data=(x_valid, y_valid), verbose=1)

In [None]:
# from Results import Results
# pred = model.predict(x_train)
# pred_test = model.predict(x_test)
# r = Results(number_edges, edges, start_node, end_node, number_steps)
# accuracy_train = r.get_accuracy(pred, x_main_train, y_train)
# accuracy_test = r.get_accuracy(pred_test, x_main_test, y_test)
# print(accuracy_train, accuracy_test)