In [1]:
import random
import networkx as nx
import numpy as np
import matplotlib.pyplot as plt
import tkinter as tk
from tkinter import messagebox, ttk

# IIoT Network Class
class IIoTNetwork:
    def __init__(self, num_nodes):
        self.num_nodes = num_nodes
        self.graph = nx.Graph()

        # Create a random IIoT network graph
        for i in range(num_nodes):
            for j in range(i+1, num_nodes):
                if random.random() < 0.4:  # Random connection probability
                    bandwidth = random.randint(50, 200)  # Mbps
                    latency = round(random.uniform(1, 10), 2)  # ms
                    self.graph.add_edge(i, j, bandwidth=bandwidth, latency=latency)

    # Display the IIoT network graph
    def draw_network(self, best_path=None):
        plt.figure(figsize=(8, 6))
        pos = nx.spring_layout(self.graph)
        nx.draw(self.graph, pos, with_labels=True, node_color='lightblue', edge_color='gray')

        # Highlight best path
        if best_path:
            path_edges = [(best_path[i], best_path[i+1]) for i in range(len(best_path)-1)]
            nx.draw_networkx_edges(self.graph, pos, edgelist=path_edges, edge_color='red', width=2)

        plt.title("IIoT Network Topology")
        plt.show()

# PSO Optimization for congestion minimization
class PSO:
    def __init__(self, network, source, destination, num_particles=10, iterations=50):
        self.network = network
        self.source = source
        self.destination = destination
        self.num_particles = num_particles
        self.iterations = iterations
        self.best_path = None
        self.best_congestion = float('inf')

    def calculate_congestion(self, path):
        return sum(self.network.graph[path[i]][path[i+1]]['latency'] for i in range(len(path)-1))

    def optimize(self):
        particles = [self.find_random_path() for _ in range(self.num_particles)]
        velocities = [random.uniform(-1, 1) for _ in range(self.num_particles)]
        personal_best = particles[:]
        personal_best_scores = [self.calculate_congestion(path) for path in personal_best]

        global_best = min(personal_best, key=self.calculate_congestion)
        global_best_score = self.calculate_congestion(global_best)

        for _ in range(self.iterations):
            for i in range(self.num_particles):
                new_path = self.find_random_path()
                new_score = self.calculate_congestion(new_path)

                # Update personal best
                if new_score < personal_best_scores[i]:
                    personal_best[i] = new_path
                    personal_best_scores[i] = new_score

                # Update global best
                if new_score < global_best_score:
                    global_best = new_path
                    global_best_score = new_score

            self.best_path = global_best
            self.best_congestion = global_best_score

        return self.best_path, self.best_congestion

    def find_random_path(self):
        try:
            return nx.shortest_path(self.network.graph, source=self.source, target=self.destination, weight='latency')
        except nx.NetworkXNoPath:
            return []

# Function to validate topology constraints
def validate_topology(network, path):
    isolated_nodes = [node for node in network.graph.nodes() if network.graph.degree(node) == 0]

    # Constraint 1: No isolated nodes
    if isolated_nodes:
        print(f"⚠️ Warning: Isolated nodes detected! {isolated_nodes}")
        return False

    # Constraint 2: Ensure all nodes have at least one connection
    for node in network.graph.nodes():
        if network.graph.degree(node) < 1:
            print(f"⚠️ Warning: Node {node} has insufficient connections!")
            return False

    # Constraint 3: Check if path exists and meets congestion criteria
    if path:
        total_latency = sum(network.graph[path[i]][path[i+1]]['latency'] for i in range(len(path)-1))
        total_bandwidth = min(network.graph[path[i]][path[i+1]]['bandwidth'] for i in range(len(path)-1))

        print("\n✅ Topology Constraints Satisfied ✅")
        print(f"Optimal Path: {path}")
        print(f"Total Latency: {total_latency:.2f} ms")
        print(f"Minimum Bandwidth on Path: {total_bandwidth} Mbps")

        return True
    else:
        print("❌ No valid path found that meets constraints!")
        return False

# GUI Application
class IIoTApp:
    def __init__(self, root):
        self.root = root
        self.root.title("IIoT Network Congestion Minimization")

        self.network = IIoTNetwork(num_nodes=10)

        # Labels
        self.label = tk.Label(root, text="IIoT Network Congestion Minimization", font=("Arial", 14))
        self.label.pack()

        # Dropdowns for source and destination selection
        self.node_options = list(self.network.graph.nodes)
        self.source_var = tk.StringVar(root)
        self.dest_var = tk.StringVar(root)

        self.source_var.set(self.node_options[0])  # Default selection
        self.dest_var.set(self.node_options[1])

        self.source_label = tk.Label(root, text="Select Source Node:")
        self.source_label.pack()
        self.source_dropdown = ttk.Combobox(root, textvariable=self.source_var, values=self.node_options)
        self.source_dropdown.pack()

        self.dest_label = tk.Label(root, text="Select Destination Node:")
        self.dest_label.pack()
        self.dest_dropdown = ttk.Combobox(root, textvariable=self.dest_var, values=self.node_options)
        self.dest_dropdown.pack()

        # Buttons
        self.optimize_button = tk.Button(root, text="Optimize Path", command=self.run_optimization)
        self.optimize_button.pack()

        self.draw_button = tk.Button(root, text="Show Network", command=self.network.draw_network)
        self.draw_button.pack()

    def run_optimization(self):
        try:
            source = int(self.source_var.get())
            destination = int(self.dest_var.get())

            if source == destination:
                messagebox.showerror("Error", "Source and Destination must be different!")
                return

            optimizer = PSO(self.network, source, destination)
            best_path, congestion = optimizer.optimize()

            if validate_topology(self.network, best_path):
                self.network.draw_network(best_path)
                messagebox.showinfo("Optimal Path", f"Best Path: {best_path}\nMinimized Congestion: {congestion:.4f}")
            else:
                messagebox.showerror("Topology Issue", "The IIoT network does not satisfy topology constraints!")

        except ValueError:
            messagebox.showerror("Error", "Invalid node selection!")

# Run Application
if __name__ == "__main__":
    root = tk.Tk()
    app = IIoTApp(root)
    root.mainloop()
