In [1]:
import socket
import threading
import uuid
import json
import hashlib
import time

In [None]:
### Max 5 connections ==> ändern?

In [16]:
class Peer_node:
  def __init__(self, host, port):
    self.host=host
    self.port=port

    combined_host_port = f"{host}:{port}"
    self.node_id=hashlib.sha256(combined_host_port.encode()).hexdigest() ## Id of node is hash of ip and port

    self.peers = {}
    self.data_store = {}
    self.server_socket = None
    self.running = False

    print(f"Node {self.node_id} initialized at {self.host}:{self.port}")

  ### Node Properties
  def get_id(self):
    return self.node_id

  def get_host(self):
    return self.host

  def get_port(self):
    return self.port

  def get_peers(self):
    return self.peers

  ### Connection to and communication with other peers

  def start(self):
        """Starts the node, listening for incoming connections."""
        self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.server_socket.bind((self.host, self.port))
        self.server_socket.listen(5) # Max 5 queued connections
        self.running = True
        print(f"Node {self.node_id} listening on {self.host}:{self.port}")

        # Start a new thread to continuously accept connections
        threading.Thread(target=self._accept_connections, daemon=True).start()

  def _accept_connections(self):
        """Internal method to accept incoming connections."""
        while self.running:
            try:
                conn, addr = self.server_socket.accept()
                print(f"Incoming connection from {addr}")
                # Handle connection in a new thread
                threading.Thread(target=self._handle_client_connection, args=(conn, addr), daemon=True).start()
            except Exception as e:
                if self.running:
                    print(f"Error accepting connection: {e}")
                break

  def _handle_client_connection(self, conn, addr):
        """Handles a single client connection (a peer)."""
        try:
            # First, receive the peer's node_id and address
            initial_data = conn.recv(1024).decode('utf-8')
            peer_info = json.loads(initial_data)
            peer_id = peer_info.get('node_id')
            peer_host = peer_info.get('host')
            peer_port = peer_info.get('port')

            if peer_id and peer_host and peer_port:
                print(f"Received peer info from {peer_id} at {peer_host}:{peer_port}")
                self.add_peer(peer_id, conn, (peer_host, peer_port))
                #self.send_message_to_peer(peer_id, "ACK: Connected!") # Acknowledge connection
                # Send our own node info to the peer
                my_info = json.dumps({'node_id': self.node_id, 'host': self.host, 'port': self.port})
                conn.sendall(my_info.encode('utf-8'))

                while True:
                    data = conn.recv(4096)
                    if not data:
                        print(f"Peer {peer_id} disconnected.")
                        #self.remove_peer(peer_id)                                #####?????
                        break
                    message = json.loads(data.decode('utf-8'))
                    self.process_message(peer_id, message)
            else:
                print(f"Invalid initial data from {addr}. Closing connection.")
                conn.close()

        except Exception as e:
            print(f"Error handling client connection from {addr}: {e}")
            conn.close()
            # If peer was added, remove it
            for peer_id, (socket_obj, _) in list(self.peers.items()):
                if socket_obj == conn:
                    self.remove_peer(peer_id)
                    break


  def connect_to_peer(self, peer_host, peer_port):
        """Connects to another peer node."""
        if (peer_host, peer_port) == (self.host, self.port):
            print("Cannot connect to self.")
            return False

        try:
            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            s.connect((peer_host, peer_port))

            # Send our own node info to the peer
            my_info = json.dumps({'node_id': self.node_id, 'host': self.host, 'port': self.port})
            s.sendall(my_info.encode('utf-8'))

            # Wait for peer's ack and initial info
            response = s.recv(1024).decode('utf-8')
            response_data = json.loads(response)
            peer_id = response_data.get('node_id') # Assume peer also sends its ID back

            if peer_id and peer_id not in self.peers:
                print(f"Successfully connected to peer {peer_id} at {peer_host}:{peer_port}")
                self.add_peer(peer_id, s, (peer_host, peer_port))

                # Start a thread to listen for messages from this peer
                threading.Thread(target=self._listen_to_peer, args=(peer_id, s), daemon=True).start()
                return True
            else:
                print(f"Already connected to or invalid response from {peer_host}:{peer_port}")
                s.close()
                return False
        except Exception as e:
            print(f"Could not connect to peer {peer_host}:{peer_port}: {e}")
            return False

  def _listen_to_peer(self, peer_id, sock):
        """Listens for messages from a specific connected peer."""
        try:
            while True:
                data = sock.recv(4096)
                if not data:
                    print(f"Peer {peer_id} disconnected during listen.")
                    #self.remove_peer(peer_id)                                  #### ???
                    break
                message = json.loads(data.decode('utf-8'))
                self.process_message(peer_id, message)
        except Exception as e:
            print(f"Error listening to peer {peer_id}: {e}")
            self.remove_peer(peer_id)

  def add_peer(self, peer_id, socket_obj, address_tuple):
        """Adds a new peer to the node's peer list."""
        if peer_id not in self.peers:
            self.peers[peer_id] = (socket_obj, address_tuple)
            print(f"Added peer: {peer_id} from {address_tuple}")
        else:
            print(f"Peer {peer_id} already in peer list.")

  def remove_peer(self, peer_id):
        """Removes a peer from the node's peer list."""
        if peer_id in self.peers:
            socket_obj, _ = self.peers[peer_id]
            try:
                socket_obj.close()
            except Exception as e:
                print(f"Error closing socket for peer {peer_id}: {e}")
            del self.peers[peer_id]
            print(f"Removed peer: {peer_id}")

  def send_message_to_peer(self, peer_id, message_type, content=None):
        """Sends a message to a specific peer."""
        if peer_id in self.peers:
            sock, _ = self.peers[peer_id]
            try:
                full_message = {'sender_id': self.node_id, 'type': message_type, 'content': content}
                sock.sendall(json.dumps(full_message).encode('utf-8'))
                print(f"Sent '{message_type}' to {peer_id}")
            except Exception as e:
                print(f"Error sending message to {peer_id}: {e}")
                self.remove_peer(peer_id)
        else:
            print(f"Peer {peer_id} not found in peer list.")

  def broadcast_message(self, message_type, content=None):
        """Sends a message to all connected peers."""
        print(f"Broadcasting message: Type={message_type}, Content={content}")
        for peer_id in list(self.peers.keys()): # Use list to avoid issues if peers are removed during iteration
            self.send_message_to_peer(peer_id, message_type, content)

  def process_message(self, sender_id, message):
        """Processes an incoming message from a peer."""
        message_type = message.get('type')
        content = message.get('content')
        print(f"Node {self.node_id} received message from {sender_id}: Type='{message_type}', Content='{content}'")

        if message_type == 'text_message':
            print(f"  > Text message: {content}")
        elif message_type == 'data_request':
            key = content.get('key')
            if key in self.data_store:
                self.send_message_to_peer(sender_id, 'data_response', {'key': key, 'value': self.data_store[key]})
            else:
                self.send_message_to_peer(sender_id, 'data_response', {'key': key, 'value': None, 'error': 'not_found'})
        elif message_type == 'data_update':
            key = content.get('key')
            value = content.get('value')
            self.data_store[key] = value
            print(f"  > Stored data: {key} = {value}")
            # Optionally, rebroadcast the update to other peers
            # self.broadcast_message('data_update', content)
        elif message_type == 'get_peers':
            # Send back a list of this node's known peers (host, port)
            peer_addresses = [peer_info[1] for peer_info in self.peers.values()]
            self.send_message_to_peer(sender_id, 'peer_list', {'peers': peer_addresses})
        elif message_type == 'peer_list':
            # Add newly discovered peers to our list
            new_peers = content.get('peers', [])
            for peer_host, peer_port in new_peers:
                if (peer_host, peer_port) != (self.host, self.port) and (peer_host, peer_port) not in [p[1] for p in self.peers.values()]:
                    print(f"Attempting to connect to new peer: {peer_host}:{peer_port}")
                    self.connect_to_peer(peer_host, peer_port)

  def stop(self):
        """Stops the node and closes all connections."""
        self.running = False
        if self.server_socket:
            print(f"Closing server socket for Node {self.node_id}.")
            self.server_socket.close()
        for peer_id in list(self.peers.keys()):
            self.remove_peer(peer_id)
        print(f"Node {self.node_id} stopped.")

  ### Data needs to be ajusted
  def add_data(self,data):
    self.data_store.add(data)

  def get_data(self):
    return self.data_store

  def remove_data(self,data):
    self.data_store.remove(data)

In [19]:
node1 = Peer_node("127.0.0.1", 8007)
node2 = Peer_node("127.0.0.1", 8008)
node3 = Peer_node("127.0.0.1", 8009)

node1.start()
node2.start()
node3.start()

time.sleep(1) # Give servers a moment to start

Node 354f8a05c517336e1929050f3db9e20517757ea252984a2793af897142938c1f initialized at 127.0.0.1:8007
Node fbd7416735a39a2abca7227cfec89912a777fa71e328bea87bc1a95425ca9aaf initialized at 127.0.0.1:8008
Node b9284ee4c4a27d1d0cb292a9cc2088a430d75bb5b2426430239ec106fd2a6cfc initialized at 127.0.0.1:8009
Node 354f8a05c517336e1929050f3db9e20517757ea252984a2793af897142938c1f listening on 127.0.0.1:8007
Node fbd7416735a39a2abca7227cfec89912a777fa71e328bea87bc1a95425ca9aaf listening on 127.0.0.1:8008
Node b9284ee4c4a27d1d0cb292a9cc2088a430d75bb5b2426430239ec106fd2a6cfc listening on 127.0.0.1:8009


In [20]:
print("Node1", node1.get_id())
print("Node2", node2.get_id())
print("Node3", node3.get_id())

Node1 354f8a05c517336e1929050f3db9e20517757ea252984a2793af897142938c1f
Node2 fbd7416735a39a2abca7227cfec89912a777fa71e328bea87bc1a95425ca9aaf
Node3 b9284ee4c4a27d1d0cb292a9cc2088a430d75bb5b2426430239ec106fd2a6cfc


In [21]:
 # Connect nodes
node1.connect_to_peer("127.0.0.1", 8008)
time.sleep(1)
node2.connect_to_peer("127.0.0.1", 8009)
time.sleep(1)
node3.connect_to_peer("127.0.0.1", 8007) # Form a mesh-like connection

time.sleep(2) # Give connections a moment to establish

print("\n--- Sending Messages ---")
node1.send_message_to_peer(node2.node_id, 'text_message', 'Hello from Node 1 to Node 2!')
time.sleep(0.5)
node2.broadcast_message('text_message', 'Greetings from Node 2 to everyone!')
time.sleep(0.5)

print("\n--- Data Operations ---")
node1.send_message_to_peer(node2.node_id, 'data_update', {'key': 'my_data', 'value': 'secret_value'})
time.sleep(0.5)
node3.send_message_to_peer(node1.node_id, 'data_request', {'key': 'my_data'})
time.sleep(0.5)
node3.send_message_to_peer(node2.node_id, 'data_request', {'key': 'non_existent_key'})
time.sleep(0.5)

print("\n--- Peer Discovery Example ---")
# Node 1 asks Node 2 for its peers
node1.send_message_to_peer(node2.node_id, 'get_peers')
time.sleep(1) # Give time for peer list to be sent back and processed

Incoming connection from ('127.0.0.1', 34302)
Received peer info from 354f8a05c517336e1929050f3db9e20517757ea252984a2793af897142938c1f at 127.0.0.1:8007
Added peer: 354f8a05c517336e1929050f3db9e20517757ea252984a2793af897142938c1f from ('127.0.0.1', 8007)
Successfully connected to peer fbd7416735a39a2abca7227cfec89912a777fa71e328bea87bc1a95425ca9aaf at 127.0.0.1:8008
Added peer: fbd7416735a39a2abca7227cfec89912a777fa71e328bea87bc1a95425ca9aaf from ('127.0.0.1', 8008)
Incoming connection from ('127.0.0.1', 41748)
Received peer info from fbd7416735a39a2abca7227cfec89912a777fa71e328bea87bc1a95425ca9aaf at 127.0.0.1:8008
Added peer: fbd7416735a39a2abca7227cfec89912a777fa71e328bea87bc1a95425ca9aaf from ('127.0.0.1', 8008)
Successfully connected to peer b9284ee4c4a27d1d0cb292a9cc2088a430d75bb5b2426430239ec106fd2a6cfc at 127.0.0.1:8009
Added peer: b9284ee4c4a27d1d0cb292a9cc2088a430d75bb5b2426430239ec106fd2a6cfc from ('127.0.0.1', 8009)
Incoming connection from ('127.0.0.1', 40390)
Received pee

In [6]:
print(node1.get_peers())
# print(node2.get_peers())
# print(node3.get_peers()

{'595ae01ad7880d5b6bc138782c0acfc41a568eb813b34f51cfd219a9de0cd6be': (<socket.socket fd=56, family=2, type=1, proto=0, laddr=('127.0.0.1', 58188), raddr=('127.0.0.1', 8002)>, ('127.0.0.1', 8002)), 'a0c9a9eff820c8c0c32a51b241c388d978cefcfb08c9204d34157be6ee178311': (<socket.socket fd=51, family=2, type=1, proto=0, laddr=('127.0.0.1', 8001), raddr=('127.0.0.1', 51226)>, ('127.0.0.1', 8003))}


In [17]:
print("Stopping nodes...")
node1.stop()
node2.stop()
node3.stop()

Stopping nodes...
Closing server socket for Node 6ec3d7e5c286923599d32a4e22b5621c8f71c05b683a22a2346507a1c2130f7e.
Node 6ec3d7e5c286923599d32a4e22b5621c8f71c05b683a22a2346507a1c2130f7e stopped.
Closing server socket for Node 01518d57f4f9ee015d52f4be059bf95de672f95467ee4e6d52037bb076f6eccf.
Node 01518d57f4f9ee015d52f4be059bf95de672f95467ee4e6d52037bb076f6eccf stopped.
Closing server socket for Node bda80c3e1dc11723ab45f6c7d89ecc9eaab856f08f5ba3bfc9307ef2c57eb6db.
Node bda80c3e1dc11723ab45f6c7d89ecc9eaab856f08f5ba3bfc9307ef2c57eb6db stopped.
