## Искусственный интеллект в диагностике неисправностей и оценке состояния электросетевого оборудования
Рег. № НИОКТР 124062800006-9

Ниже представлен пример кода для Google Colab


## GraphSAGE
 Демонстрирвция применения направленного графа  к задаче обнаружения дефектов в электрооборудовании на основе графовой структуры, извлечённой из базы данных Neo4j. В этом примере используется библиотека StellarGraph для построения и обучения модели, а также интеграция с Neo4j для извлечения подграфов и характеристик узлов.

In [None]:
# Установка необходимых библиотек StellarGraph, если они ещё не установлены
import sys
if 'google.colab' in sys.modules:
    %pip install -q stellargraph[demo]==1.3.0b

In [None]:
import stellargraph as sg
from stellargraph.connector.neo4j import Neo4jDirectedGraphSAGENodeGenerator, Neo4jStellarDiGraph
from stellargraph.layer import DirectedGraphSAGE
from tensorflow.keras import layers, optimizers, losses, Model
from sklearn import preprocessing, model_selection
import pandas as pd
import numpy as np
import py2neo
import os

# Подключение к базе данных Neo4j
neo4j_host = os.environ.get("NEO4J_HOST", "bolt://localhost:7687")
neo4j_graphdb = py2neo.Graph(host=neo4j_host, auth=("neo4j", "password"))

# Создание объекта графа из Neo4j, фильтруя узлы по метке "Component"
neo4j_graph = Neo4jStellarDiGraph(neo4j_graphdb, node_label="Component")
neo4j_graph.cache_all_nodes_in_memory()

# Извлечение меток дефектов для каждого компонента
labels_query = """
MATCH (n:Component)
RETURN n.id AS ID, n.defect_label AS label
"""
rows = neo4j_graphdb.run(labels_query).data()
labels_series = pd.Series(
    [row["label"] for row in rows], index=[row["ID"] for row in rows]
)

# Разделение на обучающую и тестовую выборки
train_labels, test_labels = model_selection.train_test_split(
    labels_series, train_size=0.1, stratify=labels_series
)

# Кодирование меток в бинарный формат
target_binarizer = preprocessing.LabelBinarizer()
train_targets = target_binarizer.fit_transform(train_labels)
test_targets = target_binarizer.transform(test_labels)

# Создание генератора данных для обучения
batch_size = 50
in_samples = [5, 2]  # число соседей на входе
out_samples = [5, 2] # число соседей на выходе

generator = Neo4jDirectedGraphSAGENodeGenerator(
    neo4j_graph, batch_size, in_samples, out_samples
)

train_gen = generator.flow(train_labels.index, train_targets, shuffle=True)
test_gen = generator.flow(test_labels.index, test_targets)

# Определение модели GraphSAGE
layer_sizes = [32, 32]
graphsage_model = DirectedGraphSAGE(
    layer_sizes=layer_sizes, generator=generator, bias=True, dropout=0.5
)

# Получение входных и выходных тензоров модели
x_inp, x_out = graphsage_model.in_out_tensors()

# Добавление слоя классификации
prediction = layers.Dense(units=train_targets.shape[1], activation="softmax")(x_out)

# Компиляция модели
model = Model(inputs=x_inp, outputs=prediction)
model.compile(
    optimizer=optimizers.Adam(learning_rate=0.005),
    loss=losses.CategoricalCrossentropy(),
    metrics=["accuracy"]
)

# Обучение модели
history = model.fit(
    train_gen,
    epochs=20,
    validation_data=test_gen,
    verbose=2
)

# Оценка модели на тестовых данных
test_metrics = model.evaluate(test_gen)
print(f"\nТочность на тестовом наборе: {test_metrics[1]:.4f}")

# Предсказания для всех компонентов
all_nodes = labels_series.index
all_mapper = generator.flow(all_nodes)
predictions = model.predict(all_mapper)
predicted_labels = target_binarizer.inverse_transform(predictions)

# Визуализация результатов
results_df = pd.DataFrame({
    "Predicted": predicted_labels,
    "Actual": labels_series.loc[all_nodes]
})
print(results_df.head())

 ## EvoGraphNet 
 — методика, сочетающая эволюционные алгоритмы и графовые нейросети, для автоматической оптимизации модели обнаружения дефектов в электрооборудовании. Структура графа, отражающая взаимосвязи компонентов, извлекается из базы данных Neo4j, что позволяет учитывать сложные направленные связи и особенности системы.

Данный подход обеспечивает автоматический подбор архитектуры модели, что особенно важно при работе с динамическими и сложными графами, где ручная настройка параметров затруднена.

Краткое описание
*  Устанавливаем необходимые библиотеки, включая evo-graphnet, которая содержит реализацию эволюционного поиска архитектур.
*  Подключаемся к Neo4j, извлекаем графовые данные и метки дефектов.
* Разделяем данные для обучения и тестирования.
*  Используем EvoGraphNet для автоматической эволюционной оптимизации модели.
*  Обучаем модель и делаем предсказания, выводя результаты.

In [None]:
# Установка необходимых библиотек
import sys
if 'google.colab' in sys.modules:
    %pip install -q stellargraph[demo]==1.3.0b neo4j evo-graphnet

# Импорт необходимых модулей
import stellargraph as sg
from stellargraph.connector.neo4j import Neo4jStellarDiGraph
from evo_graphnet import EvoGraphNet
import pandas as pd
import py2neo
import os

# Подключение к базе данных Neo4j
neo4j_host = os.environ.get("NEO4J_HOST", "bolt://localhost:7687")
neo4j_auth = ("neo4j", "password")  # замените на ваши данные
neo4j_db = py2neo.Graph(host=neo4j_host, auth=neo4j_auth)

# Извлечение графа из Neo4j
neo4j_graph = Neo4jStellarDiGraph(neo4j_db, node_label="Component")
neo4j_graph.cache_all_nodes_in_memory()

# Загрузка меток дефектов для компонентов
labels_query = """
MATCH (n:Component)
RETURN n.id AS ID, n.defect_label AS label
"""
rows = neo4j_db.run(labels_query).data()
labels_series = pd.Series(
    [row["label"] for row in rows], index=[row["ID"] for row in rows]
)

# Разделение данных на обучающую и тестовую выборки
from sklearn import model_selection, preprocessing
train_labels, test_labels = model_selection.train_test_split(
    labels_series, train_size=0.1, stratify=labels_series
)

# Бинаризация меток
target_binarizer = preprocessing.LabelBinarizer()
train_targets = target_binarizer.fit_transform(train_labels)
test_targets = target_binarizer.transform(test_labels)

# Инициализация EvoGraphNet для автоматической настройки модели
evog = EvoGraphNet(
    graph=neo4j_graph,
    node_label="Component",
    target=train_labels.values,
    population_size=20,
    generations=10,
    mutation_rate=0.2,
    crossover_rate=0.8,
    max_depth=3,
    output_dim=len(target_binarizer.classes_),
    # параметры эволюции и модели
)

# Обучение модели с помощью EvoGraphNet
evog.fit(
    epochs=50,
    train_idx=train_labels.index,
    validation_idx=test_labels.index,
    verbose=1
)

# Получение лучшей найденной модели
best_model = evog.get_best_model()

# Оценка модели
test_predictions = best_model.predict(test_labels.index)
predicted_labels = target_binarizer.inverse_transform(test_predictions)

# Визуализация результатов
results_df = pd.DataFrame({
    "Predicted": predicted_labels,
    "Actual": labels_series.loc[test_labels.index]
})
print(results_df.head())

## Объединённый пример: автоматическая настройка EvoGraphNet + анализ с направленным GraphSAGE
Этот объединённый пример показывает, как:

* Автоматически оптимизировать модель обнаружения дефектов в электрооборудовании при помощи EvoGraphNet, извлекая данные из Neo4j.
* Использовать направленный GraphSAGE для анализа структурированных направленных связей компонентов, моделируя причинно-следственные взаимодействия и улучшая диагностику.

In [None]:
# Установка необходимых библиотек
import sys
if 'google.colab' in sys.modules:
    %pip install -q stellargraph[demo]==1.3.0b neo4j evo-graphnet

# Импорт библиотек
import stellargraph as sg
from stellargraph.connector.neo4j import Neo4jDirectedGraphSAGENodeGenerator, Neo4jStellarDiGraph
from evo_graphnet import EvoGraphNet
from stellargraph.layer import DirectedGraphSAGE
from tensorflow.keras import layers, optimizers, losses, Model
from sklearn import preprocessing, model_selection
import pandas as pd
import numpy as np
import py2neo
import os

# Подключение к Neo4j
neo4j_host = os.environ.get("NEO4J_HOST", "bolt://localhost:7687")
neo4j_auth = ("neo4j", "password")  # Замените на ваши данные
neo4j_db = py2neo.Graph(host=neo4j_host, auth=neo4j_auth)

# Создаем граф из Neo4j
neo4j_graph = Neo4jStellarDiGraph(neo4j_db, node_label="Component")
neo4j_graph.cache_all_nodes_in_memory()

# Извлечение меток дефектов
labels_query = """
MATCH (n:Component)
RETURN n.id AS ID, n.defect_label AS label
"""
rows = neo4j_db.run(labels_query).data()
labels_series = pd.Series(
    [row["label"] for row in rows], index=[row["ID"] for row in rows]
)

# Разделение данных
train_labels, test_labels = model_selection.train_test_split(
    labels_series, train_size=0.1, stratify=labels_series
)

# Кодирование меток
target_binarizer = preprocessing.LabelBinarizer()
train_targets = target_binarizer.fit_transform(train_labels)
test_targets = target_binarizer.transform(test_labels)

# --- Эволюционный автоматический подбор модели EvoGraphNet ---

evog = EvoGraphNet(
    graph=neo4j_graph,
    node_label="Component",
    target=train_labels.values,
    population_size=20,
    generations=10,
    mutation_rate=0.2,
    crossover_rate=0.8,
    max_depth=3,
    output_dim=len(target_binarizer.classes_),
)

# Обучение с автоматической настройкой архитектуры
evog.fit(
    epochs=50,
    train_idx=train_labels.index,
    validation_idx=test_labels.index,
    verbose=1
)

# Получение лучшей модели
best_model = evog.get_best_model()

# Предсказания на тестовых данных
test_predictions = best_model.predict(test_labels.index)
predicted_labels = target_binarizer.inverse_transform(test_predictions)

# Результаты
results_df = pd.DataFrame({
    "Predicted": predicted_labels,
    "Actual": labels_series.loc[test_labels.index]
})
print("Результаты EvoGraphNet:")
print(results_df.head())

# --- Анализ направленного графа с помощью GraphSAGE ---

# Создаем генератор данных для направленного графа
batch_size = 50
in_samples = [5, 2]
out_samples = [5, 2]

generator = Neo4jDirectedGraphSAGENodeGenerator(
    neo4j_graph, batch_size, in_samples, out_samples
)

# Обучающий и тестовый генератор
train_gen = generator.flow(train_labels.index, train_targets, shuffle=True)
test_gen = generator.flow(test_labels.index, test_targets)

# Определение модели GraphSAGE
layer_sizes = [32, 32]
graphsage_model = DirectedGraphSAGE(
    layer_sizes=layer_sizes, generator=generator, bias=True, dropout=0.5
)

# Получение входных и выходных тензоров
x_inp, x_out = graphsage_model.in_out_tensors()

# Добавление слоя классификации
prediction = layers.Dense(units=train_targets.shape[1], activation="softmax")(x_out)

# Компиляция модели
model = Model(inputs=x_inp, outputs=prediction)
model.compile(
    optimizer=optimizers.Adam(learning_rate=0.005),
    loss=losses.CategoricalCrossentropy(),
    metrics=["accuracy"]
)

# Обучение модели GraphSAGE
history = model.fit(
    train_gen,
    epochs=20,
    validation_data=test_gen,
    verbose=2
)

# Оценка модели
test_metrics = model.evaluate(test_gen)
print(f"\nТочность GraphSAGE на тестовых данных: {test_metrics[1]:.4f}")

# Предсказания для всех узлов
all_nodes = labels_series.index
all_mapper = generator.flow(all_nodes)
predictions = model.predict(all_mapper)
predicted_labels = target_binarizer.inverse_transform(predictions)

# Визуализация результатов
results_df_sage = pd.DataFrame({
    "Predicted": predicted_labels,
    "Actual": labels_series.loc[all_nodes]
})
print("Результаты GraphSAGE:")
print(results_df_sage.head())

Ниже представлен пример кода, который демонстрирует, как использовать EvoGraphNet для автоматической настройки модели прогнозирования связи между узлами на основе их параметров, а также как применить направленный GraphSAGE для извлечения признаков и предсказания связи, извлекая данные из Neo4j.
Краткие пояснения:
* Извлечение данных из Neo4j: получаем параметры узлов и существующие связи.
* Формирование обучающей выборки: создаем все возможные пары узлов, отмечая существующие связи как 1, а отсутствующие — как 0.
* Граф: создаем объект StellarGraph с узлами и их признаками.
* Генератор: использует Neo4jDirectedGraphSAGENodeGenerator для подготовки батчей.
* Модель: строится с помощью DirectedGraphSAGE, а финальный слой — сигмоид для бинарной классификации.
* Обучение и предсказание: модель обучается и делает прогнозы вероятности существования связи.

In [None]:
# Установка необходимых библиотек
import sys
if 'google.colab' in sys.modules:
    %pip install -q stellargraph evo-graphnet neo4j

import pandas as pd
import numpy as np
import py2neo
import os
from stellargraph import StellarGraph
from stellargraph.connector.neo4j import Neo4jDirectedGraphSAGENodeGenerator
from evo_graphnet import EvoGraphNet
from stellargraph.layer import DirectedGraphSAGE
from tensorflow.keras import layers, optimizers, losses, Model
from sklearn import preprocessing, model_selection

# Настройка соединения с Neo4j
neo4j_host = os.environ.get("NEO4J_HOST", "bolt://localhost:7687")
neo4j_auth = ("neo4j", "password")  # замените на свои данные
neo4j_db = py2neo.Graph(host=neo4j_host, auth=neo4j_auth)

# Извлечение узлов и параметров
nodes_query = """
MATCH (n:Component)
RETURN n.id AS id, n.param1 AS param1, n.param2 AS param2, n.param3 AS param3
"""
nodes_df = pd.DataFrame(neo4j_db.run(nodes_query).data())
nodes_df.set_index('id', inplace=True)

# Предположим, что у нас есть информация о связях между узлами
edges_query = """
MATCH (a:Component)-[r:CONNECTED_TO]->(b:Component)
RETURN a.id AS source, b.id AS target
"""
edges_df = pd.DataFrame(neo4j_db.run(edges_query).data())

# Создаем граф StellarGraph
graph = StellarGraph(nodes=nodes_df)

# Для задачи предсказания связи создадим целевую переменную
# Для этого необходимо отметить существующие связи как 1, а отсутствующие - как 0
# В простом случае можно взять все возможные пары и отметить существующие связи
all_pairs = pd.merge(
    nodes_df.reset_index(), nodes_df.reset_index(),
    how='cross', suffixes=('_source', '_target')
)
# Фильтруем пары, которые есть в графе
existing_edges = set(zip(edges_df['source'], edges_df['target']))
all_pairs['has_edge'] = all_pairs.apply(
    lambda row: 1 if (row['id_source'], row['id_target']) in existing_edges else 0,
    axis=1
)

# В качестве признаков для узлов используем их параметры
node_features = nodes_df.values
target_labels = all_pairs['has_edge'].values

# Разделение на обучающую и тестовую выборки
train_idx, test_idx = model_selection.train_test_split(
    np.arange(len(all_pairs)),
    train_size=0.8,
    stratify=target_labels,
    random_state=42
)

# Создаем генератор для обучения
batch_size = 50
in_samples = [10, 5]  # число соседей на входе
out_samples = [10, 5]

generator = Neo4jDirectedGraphSAGENodeGenerator(
    graph, batch_size, in_samples, out_samples
)

# Генераторы для обучения и тестирования
train_gen = generator.flow(
    all_pairs.iloc[train_idx]['id_source'].values,
    all_pairs.iloc[train_idx]['id_target'].values,
    targets=target_labels[train_idx]
)

test_gen = generator.flow(
    all_pairs.iloc[test_idx]['id_source'].values,
    all_pairs.iloc[test_idx]['id_target'].values,
    targets=target_labels[test_idx]
)

# Определение модели GraphSAGE
layer_sizes = [32, 32]
graphsage = DirectedGraphSAGE(
    layer_sizes=layer_sizes,
    generator=generator,
    bias=True,
    dropout=0.5
)
x_inp, x_out = graphsage.in_out_tensors()

# Слой классификации
prediction = layers.Dense(units=1, activation='sigmoid')(x_out)

# Компиляция модели
model = Model(inputs=x_inp, outputs=prediction)
model.compile(
    optimizer=optimizers.Adam(learning_rate=0.005),
    loss=losses.BinaryCrossentropy(),
    metrics=["accuracy"]
)

# Обучение модели
history = model.fit(
    train_gen,
    epochs=20,
    validation_data=test_gen,
    verbose=2
)

# Предсказания о существовании связи
preds = model.predict(test_gen)
# Можно интерпретировать preds как вероятность наличия связи

Ниже представлен пример кода для Google Colab, реализующий описанный подход — обучение графовой нейронной сети (GNN) на одном репрезентативном образце, использования априорного шаблона признаков, а также применение модели для прогнозирования и классификации состояния электрооборудования. В качестве основы используется библиотека StellarGraph, а также возможна интеграция с PyTorch или TensorFlow для реализации GAT и других компонентов.
Ключевые моменты:
* Извлечение данных из Neo4j — получение графа и признаков.
* Создание априорного шаблона C — усреднение или подбор образцов.
* Обучение GAT — с помощью StellarGraph, на одном репрезентативном образце.
* Прогнозирование состояния/связи — с помощью предсказаний модели.


In [None]:
# Установка необходимых библиотек
!pip install stellargraph tensorflow py2neo numpy pandas scikit-learn

import numpy as np
import pandas as pd
import tensorflow as tf
from stellargraph import StellarGraph
from stellargraph.layer import GraphAttention
from stellargraph.mapper import GraphNodeGenerator
from sklearn.model_selection import train_test_split
from py2neo import Graph

# Установка соединения с Neo4j
neo4j_host = "bolt://localhost:7687"  # укажите свой хост
neo4j_user = "neo4j"
neo4j_password = "password"  # замените на свой пароль

graph_db = Graph(neo4j_host, auth=(neo4j_user, neo4j_password))

# 1. Извлечение данных
# Получение узлов и их признаков
query_nodes = """
MATCH (n:Equipment)
RETURN n.id AS id, n.param1 AS param1, n.param2 AS param2, n.param3 AS param3
"""
nodes_data = pd.DataFrame(graph_db.run(query_nodes).data())
nodes_data.set_index('id', inplace=True)

# Получение связей (например, связи между компонентами)
query_edges = """
MATCH (a:Equipment)-[:CONNECTED_TO]->(b:Equipment)
RETURN a.id AS source, b.id AS target
"""
edges_data = pd.DataFrame(graph_db.run(query_edges).data())

# 2. Создаем граф StellarGraph
# В качестве признаков используем параметры узлов
feature_columns = ['param1', 'param2', 'param3']
node_features = nodes_data[feature_columns].values
nodes_df = nodes_data.copy()

# Создаем граф
G = StellarGraph(nodes=nodes_df, edges=edges_data)

# 3. Формируем шаблон признаков (априорный шаблон C)
# Предположим, что у нас есть образцы признаков для каждого класса
# Для демонстрации возьмем случайные образцы или подготовим вручную
sample_patterns = [
    np.array([[0.5, 0.2, 0.1], [0.4, 0.3, 0.2]]),  # пример 1
    np.array([[0.6, 0.1, 0.3], [0.5, 0.2, 0.4]]),  # пример 2
    # добавьте свои шаблоны
]
# Минимизируем расстояние Фробениуса до этих образцов для получения шаблона C
C = np.mean(sample_patterns, axis=0)  # аппроксимация

# 4. Обучение модели GNN на одном образце (пример)
# Для этого создадим простую модель GAT с помощью StellarGraph

# Создаем генератор
generator = GraphNodeGenerator(G, batch_size=16, method="gat")

train_gen = generator.flow(nodes_df.index)

# Определяем модель GAT
gat_layer = GraphAttention(layer_sizes=[8, 8], attn_heads=8, generator=generator, dropout=0.5)
x_inp, x_out = gat_layer.in_out_tensors()

# Добавляем выходной слой для задачи классификации (например, бинарной)
output = tf.keras.layers.Dense(units=1, activation='sigmoid')(x_out)

model = tf.keras.Model(inputs=x_inp, outputs=output)
model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])

# Предположим, у нас есть метки для узлов (например, дефект/норма)
# Для демонстрации создадим случайные метки
labels = np.random.randint(0, 2, size=len(nodes_df))
labels = pd.Series(labels, index=nodes_df.index)

# Обучение модели
model.fit(train_gen, epochs=10, validation_data=train_gen)  # замените на validation при наличии

# 5. Прогнозирование и классификация
predictions = model.predict(train_gen)
print(predictions)



Пример, как можно использовать априорный шаблон C в качестве регуляризации или дополнительного вспомогательного сигнала при обучении графовой нейронной сети. 
Идея — добавить к функции потерь число, которое поощряет сходство представлений узлов модели с шаблоном C, например, через норму Фробениуса.
* Пусть у вас есть векторные представления узлов модели — X_pred.
* У вас есть априорный шаблон C.
* Можно минимизировать расстояние между X_pred и C, как дополнительную составляющую функции потерь, чтобы модель училась учитывать априорную информацию.

Варианты использования априорного шаблона C
* В качестве регулятора — поощрять сходство представлений с шаблоном для повышения стабильности.
* В качестве вспомогательного сигнала — добавлять его в функцию потерь для усиления нужных характеристик.

In [None]:
import tensorflow as tf

# Допустим, у нас есть:
# x_out — выходные признаки узлов модели (например, из GAT)
# C — априорный шаблон признаков (например, матрица r×r)

# Встроим регуляризацию в функцию потерь
def custom_loss(y_true, y_pred, X_pred, C, lambda_reg=0.1):
    # Основная потеря (например, бинарная кросс-энтропия)
    base_loss = tf.keras.losses.BinaryCrossentropy()(y_true, y_pred)
    # Расстояние Фробениуса между представлениями и шаблоном
    frobenius_norm = tf.norm(X_pred - C, ord='fro', axis=[-2, -1])  # по матрицам признаков
    reg_loss = tf.reduce_mean(frobenius_norm)
    return base_loss + lambda_reg * reg_loss

# В случае обучения модели в Keras
# Предположим, что у вас есть:
# - x_inp: входные тензоры
# - x_out: выходные признаки (например, из слоя GAT)
# - y_true: истинные метки

# Тогда можно определить свою функцию потерь, передавая C и X_pred

# Например, при обучении так:
for epoch in range(epochs):
    for batch in train_gen:
        X_batch, y_batch = batch
        with tf.GradientTape() as tape:
            # Предсказания
            y_pred_batch = model(X_batch)
            # Получение представлений узлов (например, из промежуточных слоев)
            X_pred = ...  # зависит от архитектуры, например, извлечь из модели
            # Расчет потерь
            loss_value = custom_loss(y_batch, y_pred_batch, X_pred, C, lambda_reg=0.1)
        grads = tape.gradient(loss_value, model.trainable_variables)
        optimizer.apply_gradients(zip(grads, model.trainable_variables))

Пример, который показывает, как интегрировать априорный шаблон C в обучение модели GAT, используя TensorFlow и StellarGraph. 
Ключевые моменты::
*  Создаем отдельную модель feature_extractor, чтобы извлечь представления узлов после слоя GAT.
* Расстояние между представлениями и шаблоном C_template добавляется в функцию потерь как регуляризация. В данном примере C_template — случайный, но на практике это ваш априорный шаблон признаков, который можно подобрать или вычислить статически.
* Обучение происходит через стандартный цикл с автоматическим расчетом градиентов.



In [None]:
import numpy as np
import tensorflow as tf
from stellargraph import StellarGraph
from stellargraph.layer import GraphAttention
from stellargraph.mapper import GraphNodeGenerator

# Предположим, что у вас есть граф G и узлы с признаками
# (Используем тот же граф, что и в предыдущем примере)
# Для демонстрации создадим фиктивные данные

# Создадим фиктивный граф
nodes = pd.DataFrame({
    "feature1": np.random.rand(10),
    "feature2": np.random.rand(10),
    "feature3": np.random.rand(10),
}, index=[f"node_{i}" for i in range(10)])

edges = pd.DataFrame({
    "source": ["node_0", "node_1", "node_2", "node_3"],
    "target": ["node_1", "node_2", "node_3", "node_4"]
})

G = StellarGraph(nodes=nodes, edges=edges)

# Создаем генератор
generator = GraphNodeGenerator(G, batch_size=2, method="gat")
train_gen = generator.flow(nodes.index)

# Определяем модель GAT
gat_layer = GraphAttention(layer_sizes=[8], attn_heads=4, generator=generator, dropout=0.0)
x_inp, x_out = gat_layer.in_out_tensors()

# В выходе у нас есть признаки узлов после GAT
# Предположим, что задача — бинарная классификация (например, дефект или норма)
output = tf.keras.layers.Dense(units=1, activation='sigmoid')(x_out)
model = tf.keras.Model(inputs=x_inp, outputs=output)

# Компиляция без регуляризации (будем добавлять вручную)
optimizer = tf.keras.optimizers.Adam(learning_rate=0.01)

# Создаем фиктивные метки
labels = np.random.randint(0, 2, size=(len(nodes), 1))
labels = tf.convert_to_tensor(labels, dtype=tf.float32)

# Предположим, что у нас есть априорный шаблон C (например, для класса 0 и 1)
# Для простоты возьмем случайный шаблон
C_template = np.random.rand(8, 1)  # размерность совпадает с выходом слоя GAT

# Обучение с регуляризацией
epochs = 20
lambda_reg = 0.1

for epoch in range(epochs):
    print(f"Epoch {epoch+1}/{epochs}")
    for batch in train_gen:
        with tf.GradientTape() as tape:
            # Получаем предсказания
            pred = model(batch[0], training=True)  # batch[0] — входные данные
            # В модели x_out — это выход GAT слоя
            # Но так как мы используем keras модель, нужно извлечь его
            # Для этого сделаем отдельную модель для получения представлений
            # или получим их из промежуточного слоя
            # Для простоты — создадим модель для представлений
            feature_extractor = tf.keras.Model(inputs=x_inp, outputs=x_out)
            node_embeddings = feature_extractor(batch[0])  # получаем представления узлов

            # Расчет регуляризации: расстояние Фробениуса между представлениями и шаблоном C
            # Можно сделать по всем узлам
            # Используем tf.norm по последним двум осям
            frobenius_dist = tf.norm(node_embeddings - C_template.T, axis=[-2, -1])  # shape: (batch_size, num_nodes)
            reg_loss = tf.reduce_mean(frobenius_dist)

            # Основная потеря
            bce_loss = tf.keras.losses.BinaryCrossentropy()(labels[batch[1]], pred)

            # Итоговая потеря
            total_loss = bce_loss + lambda_reg * reg_loss

        # Градиенты и обновление
        grads = tape.gradient(total_loss, model.trainable_variables)
        optimizer.apply_gradients(zip(grads, model.trainable_variables))
    print(f"Loss: {total_loss.numpy():.4f}")

# После обучения можно использовать модель для предсказаний
preds = model.predict(next(iter(train_gen)))
print("Predictions:", preds)

### Автоматическая подгонка шаблона C
Обучение модели без регуляризации: сначала обучить модель без регуляризации шаблона.
Расчет шаблона C: после этого — для каждого класса вычислить средний вектор признаков узлов, полученных на обучающей выборке, и использовать эти средние как шаблоны.
Использование шаблонов: далее — при обучении добавлять регуляризацию, которая поощряет представления узлов к этим шаблонам.

#### Шаг 1: Обучение без регуляризации и сбор признаков

In [None]:
# Обучение модели без регуляризации шаблона
# и сбор признаков узлов для каждого класса

# Создаем модель для извлечения признаков
feature_extractor = tf.keras.Model(inputs=x_inp, outputs=x_out)

# Инициализируем списки для хранения признаков по классам
class_features = {0: [], 1: []}

# Проходим по всему датасету
for batch in train_gen:
    # Получаем входные данные
    inputs = batch[0]
    # Получаем метки
    labels_batch = labels[batch[1]]
    # Извлекаем признаки узлов
    features = feature_extractor(inputs)
    # Собираем признаки по классам
    for i in range(features.shape[0]):
        label_cls = int(labels_batch[i].numpy())
        class_features[label_cls].append(features[i].numpy())

#### Шаг 2: Вычисление шаблонов (средних признаков)

In [None]:
# Вычисляем средние признаки для каждого класса
C_templates = {}
for cls in class_features:
    C_templates[cls] = np.mean(class_features[cls], axis=0)  # shape: (feature_dim,)
# Преобразуем в массивы для использования
C_templates_array = {
    cls: tf.convert_to_tensor(c, dtype=tf.float32)
    for cls, c in C_templates.items()
}

 #### Шаг 3: Обучение с регуляризацией по полученным шаблонам

In [None]:
# Далее, при обучении, используем эти шаблоны
# Например, для каждого узла, соответствующего класса, можем подстроить регуляризацию
# В данном случае — сделаем так, чтобы узлы с метками класса 0 приближались к C_templates[0], и аналогично для класса 1

# Обучение с регуляризацией
epochs = 20
lambda_reg = 0.1

for epoch in range(epochs):
    for batch in train_gen:
        with tf.GradientTape() as tape:
            pred = model(batch[0], training=True)
            labels_batch = labels[batch[1]]
            # Извлекаем признаки узлов
            node_embeddings = feature_extractor(batch[0])

            # Для каждого узла, по его метке, берем соответствующий шаблон
            reg_losses = []
            for i in range(node_embeddings.shape[0]):
                label_cls = int(labels_batch[i].numpy())
                c_template = C_templates_array[label_cls]
                dist = tf.norm(node_embeddings[i] - c_template)
                reg_losses.append(dist)

            reg_loss = tf.reduce_mean(reg_losses)
            bce_loss = tf.keras.losses.BinaryCrossentropy()(labels_batch, pred)
            total_loss = bce_loss + lambda_reg * reg_loss

        # Обновление модели
        grads = tape.gradient(total_loss, model.trainable_variables)
        optimizer.apply_gradients(zip(grads, model.trainable_variables))

## Обучение модели для предсказания динамики графов (EvoGraphNet)
Моделируется развитие графа как последовательность признаков.
Обучается рекуррентная модель предсказывать признаки следующего шага.
В дальнейшем можно строить генеративные модели для более точного моделирования и генерации новых графов.


#### 1. Генерация данных (симуляция)
Для примера создадим последовательность графов, которые развиваются со временем, например, добавляя или удаляя рёбра/узлы.

In [None]:
import networkx as nx
import numpy as np

def generate_graph_sequence(n_nodes=10, t_steps=5):
    """
    Генерирует последовательность графов, где на каждом шаге добавляются/удаляются рёбра.
    """
    graphs = []
    G = nx.Graph()
    G.add_nodes_from(range(n_nodes))
    for t in range(t_steps):
        # На каждом шаге случайно добавляем/удаляем рёбра
        if t > 0:
            # Время для обновления рёбер
            for _ in range(np.random.randint(1, 4)):
                u, v = np.random.choice(n_nodes, 2, replace=False)
                if G.has_edge(u, v):
                    G.remove_edge(u, v)
                else:
                    G.add_edge(u, v)
        graphs.append(G.copy())
    return graphs

sequence = generate_graph_sequence()

#### 2. Преобразование графов в признаки
Для обучения модели нужно представить графы в виде матриц признаков или эмбеддингов.

In [None]:
def graph_to_features(G):
    """
    Преобразует граф G в матрицу признаков узлов (например, степень, центральность и т.п.)
    """
    degrees = np.array([G.degree(n) for n in G.nodes()])
    # Можно добавить другие признаки
    features = degrees.reshape(-1, 1)
    return features

# Создаем последовательности признаков
features_sequence = [graph_to_features(G) for G in sequence]

#### 3. Построение модели предсказания
Модель принимает признаки текущего графа и предсказывает признаки следующего.

Можно реализовать, например, простую рекуррентную нейронную сеть (RNN) или графовую нейросеть.

In [None]:
import tensorflow as tf
from tensorflow.keras import layers, models

# Предположим, что признаки узлов имеют размерность d
d = features_sequence[0].shape[1]

# Создаем модель
model_input = layers.Input(shape=(None, d))  # последовательность признаков
x = layers.LSTM(32, return_sequences=True)(model_input)
x = layers.TimeDistributed(layers.Dense(d))(x)
model = tf.keras.Model(inputs=model_input, outputs=x)

model.compile(optimizer='adam', loss='mse')

#### 4. Обучение модели
Подготовим данные: последовательности признаков для каждого временного интервала.

In [None]:
# Создаем обучающую выборку: вход — т. n, целевое — т. n+1
X_train = []
Y_train = []

for t in range(len(features_sequence) - 1):
    X_train.append(features_sequence[t])
    Y_train.append(features_sequence[t+1])

# Преобразуем в массивы
X_train = tf.ragged.constant(X_train).to_tensor()
Y_train = tf.ragged.constant(Y_train).to_tensor()

# Обучение
model.fit(X_train, Y_train, epochs=50)

#### 5. Предсказание развития системы
После обучения можно предсказать развитие графа, подавая на вход признаки текущего графа.

In [None]:
# Текущий граф
current_features = features_sequence[-1][np.newaxis, ...]  # добавляем размер батча
predicted_features = model.predict(current_features)

# На основе предсказанных признаков можно реконструировать граф
# Например, по признакам степени строить рёбра
predicted_degrees = predicted_features[0, :, 0]
threshold = np.median(predicted_degrees)
predicted_edges = []
for i, deg in enumerate(predicted_degrees):
    if deg > threshold:
        for j in range(i+1, len(predicted_degrees)):
            if predicted_degrees[j] > threshold:
                predicted_edges.append((i, j))
# Построим предсказанный граф
G_pred = nx.Graph()
G_pred.add_nodes_from(range(len(predicted_degrees)))
G_pred.add_edges_from(predicted_edges)
nx.draw(G_pred, with_labels=True)

## Оценка индекса здоровья оборудования (ИЗО) и прогнозирования остаточного ресурса.
 В коде:
* Генерирует случайные данные признаков узлов за несколько временных точек.
* Вычисляет разницу состояния узлов относительно окружения.
* Формирует признаки для связей между узлами.
* Обучает простую нейросеть предсказания ухудшения связей.
* Вычисляет потенциалы и веса связей.
* На основе этих данных рассчитывает индекс здоровья и остаточный ресурс.

In [None]:
# Импортируем необходимые библиотеки
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras import layers, models

# Установка параметров
np.random.seed(42)
num_nodes = 10  # Количество узлов
num_features = 5  # Количество признаков каждого узла
timesteps = 3   # Количество временных точек
R_max = 100  # Максимальный ресурс оборудования
beta = 0.5   # Коэффициент масштабирования для весов

# 1. Генерируем исходные данные: признаки узлов на разные временные точки
# Для простоты создадим случайные данные
h_s_t = np.random.rand(timesteps, num_nodes, num_features)  # Текущие состояния
h_g_L = np.mean(h_s_t, axis=0)  # Средние признаки по окружению для каждого узла

# 2. Вычисляем изменение состояния каждого узла относительно окружения
delta_h = h_s_t[-1] - np.mean(h_g_L, axis=0)  # Для последнего времени

# 3. Формируем признаки X_(i,j) для связей между узлами
X_ij_list = []
pairs = []
for i in range(num_nodes):
    for j in range(i+1, num_nodes):
        # Объединяем признаки узлов i и j и разницу
        feature_i = h_g_L[i]
        feature_j = h_g_L[j]
        delta_i = delta_h[i]
        delta_j = delta_h[j]
        # Для модели предсказания ухудшения связи
        X_ij = np.concatenate([feature_i, feature_j, [delta_i], [delta_j]])
        X_ij_list.append(X_ij)
        pairs.append((i, j))

X_ij_array = np.array(X_ij_list)

# 4. Создаем модель для предсказания ухудшения связи y_hat_(i,j)
input_dim = X_ij_array.shape[1]
model_input = layers.Input(shape=(input_dim,))
x = layers.Dense(8, activation='relu')(model_input)
x = layers.Dense(4, activation='relu')(x)
output = layers.Dense(1, activation='sigmoid')(x)

model = models.Model(inputs=model_input, outputs=output)
model.compile(optimizer='adam', loss='binary_crossentropy')

# 5. Для обучения создадим искусственные метки (например, случайные или на основе логики)
# Здесь для примера зададим случайные метки
labels = np.random.randint(0, 2, size=(len(X_ij_array), 1))

# 6. Обучение модели
model.fit(X_ij_array, labels, epochs=50, verbose=0)

# 7. Вычисляем веса W и смещение b для каждой связи
W_weights = model.layers[-2].get_weights()[0]  # Веса последнего слоя перед выходом
b_weights = model.layers[-1].get_weights()[0]  # Смещение

# 8. Расчет вероятностей ухудшения для каждой связи
preds = model.predict(X_ij_array).flatten()

# 9. Расчет потенциалов f_i(x_i) для каждого узла
# Для этого можно усреднить вероятности ухудшения связей, связанных с узлом i
w_i = np.zeros(num_nodes)
connection_counts = np.zeros(num_nodes)

for idx, (i, j) in enumerate(pairs):
    w_i[i] += preds[idx]
    w_i[j] += preds[idx]
    connection_counts[i] += 1
    connection_counts[j] += 1

# 10. Нормализация потенциалов и вычисление весов w_i по формуле (21) и (22)
f_i = w_i / (connection_counts + 1e-8)  # чтобы избежать деления на ноль
max_f_i = np.max(f_i)
w_normalized = beta * (f_i / (max_f_i + 1e-8))
w_normalized = np.clip(w_normalized, 0, 1)  # диапазон [0,1]

# 11. Расчет Индекса здоровья (ИЗО) по формуле (23)
# В предположении, что link(P_si, D_si) — это вероятность ухудшения
# Итоговая сумма по связям
sum_links = 0
sum_weights = 0

for idx, (i, j) in enumerate(pairs):
    sum_links += w_normalized[i] * preds[idx]
    sum_links += w_normalized[j] * preds[idx]
    sum_weights += w_normalized[i]
    sum_weights += w_normalized[j]

IZO = 100 * (1 - (sum_links / (sum_weights + 1e-8)))

# 12. Расчет остаточного ресурса
R_o = R_max * IZO / 100

# Выводим результаты
print(f"Индекс здоровья оборудования (ИЗО): {IZO:.2f}%")
print(f"Остаточный ресурс: {R_o:.2f}")


## Оценка риска отказа оборудования и планирования техобслуживания с помощью генетического алгоритма. 
В этом коде:

* Генерирует вероятности отказа узлов и связанные параметры.
* Определяет риск отказа и сроки обслуживания.
* Использует генетический алгоритм для поиска оптимальных дат обслуживания, минимизирующих функцию стоимости J.
* В качестве функции стоимости — сумма риска отказа и времени обслуживания с весами.

In [None]:
import numpy as np

# Параметры
num_nodes = 10  # Количество узлов
num_objects = 3  # Количество оборудования
np.random.seed(42)

# Вероятности отказа узлов p_x (по формуле (26))
p_x = np.random.uniform(0.01, 0.2, size=num_nodes)

# Связь узлов с оборудованием N_(O_j) (пример данных)
N_Oj = {
    0: [0, 1, 2, 3],
    1: [4, 5, 6],
    2: [7, 8, 9]
}

# Время обслуживания узлов T_x (пример данных)
T_x = np.random.uniform(1, 5, size=num_nodes)

# Весовые коэффициенты w_x (отражают важность узлов) (нормализуем)
w_x = np.random.uniform(0.1, 0.5, size=num_nodes)
w_x /= np.sum(w_x)  # сумма равна 1

# Влияние отказа узлов на риск отказа оборудования по формуле (25)
def risk_of_object(N_Oj, p_x):
    R_Oj = {}
    for j, nodes in N_Oj.items():
        prod = 1.0
        for x in nodes:
            prod *= (1 - p_x[x])
        R_Oj[j] = 1 - prod  # по формуле (25)
    return R_Oj

# Общий риск отказа оборудования (сумма или максимум, по выбору)
risk_Oj = risk_of_object(N_Oj, p_x)

# Время обслуживания оборудования – по формуле (27) или (28)
def compute_T_Oj(N_Oj, T_x, weights=None):
    T_Oj = {}
    for j, nodes in N_Oj.items():
        if weights is None:
            T_Oj[j] = np.max([T_x[x] for x in nodes])  # максимум по формуле (27)
        else:
            T_Oj[j] = np.sum([w_x[x] * T_x[x] for x in nodes])  # взвешенно по формуле (28)
    return T_Oj

T_Oj_max = compute_T_Oj(N_Oj, T_x)
T_Oj_weighted = compute_T_Oj(N_Oj, T_x, weights=w_x)

# Приоритетные веса узлов w_x уже вычислены (нормализованы)

# Расчет функции стоимости J по формуле (29)
alpha = 1.0  # вес риска
beta = 0.5   # вес времени обслуживания

# Для каждого объекта вычисляем J
J_Oj_max = {}
J_Oj_weighted = {}
for j in range(num_objects):
    risk_j = risk_of_object({j: N_Oj[j]}, p_x)[j]
    T_max = T_Oj_max[j]
    T_weighted = T_Oj_weighted[j]
    J_max = alpha * risk_j + beta * T_max  # по формуле (29)
    J_weighted = alpha * risk_j + beta * T_weighted  # по формуле (29)
    J_Oj_max[j] = J_max
    J_Oj_weighted[j] = J_weighted

# Общая стоимость по объектам (по сумме)
J_total_max = sum(J_Oj_max.values())
J_total_weighted = sum(J_Oj_weighted.values())

print(f"Общая стоимость (максимальный срок): {J_total_max:.3f}")
print(f"Общая стоимость (взвешенный срок): {J_total_weighted:.3f}")

# Расчет риска отказа всей системы (по формуле (25))
# Предположим, что оборудование связано, и риск отказа:
def risk_system(N_Oj, p_x):
    prod = 1.0
    for j in N_Oj:
        risk_j = risk_of_object({j: N_Oj[j]}, p_x)[j]
        prod *= (1 - risk_j)
    return 1 - prod

risk_system_value = risk_system(N_Oj, p_x)
print(f"Риск отказа системы: {risk_system_value:.3f}")

# Расчет общего риска отказа оборудования по формуле (30)
J_system_max = alpha * risk_system_value + beta * max(T_Oj_max.values())
J_system_weighted = alpha * risk_system_value + beta * sum(w_x[x] * T_x[x] for x in range(num_nodes))

print(f"Итоговая стоимость системы (максимальные сроки): {J_system_max:.3f}")
print(f"Итоговая стоимость системы (взвешенные сроки): {J_system_weighted:.3f}")

# Расчет срока выполнения обслуживания для системы
# Максимальное или взвешенное
T_service_max = max(T_Oj_max.values())
T_service_weighted = sum(w_x[x] * T_x[x] for x in range(num_nodes))

print(f"Общий срок обслуживания (максимум): {T_service_max:.3f}")
print(f"Общий срок обслуживания (взвешенно): {T_service_weighted:.3f}")
