In [None]:
################    SERVER    ##########################
import asyncio
import websockets
import json
import random
import aiortc
from aiortc import RTCPeerConnection, RTCSessionDescription, RTCIceCandidate
import uuid
import ssl
import os
# Auction State
auction_item = "Example Item"
current_bid = 0
current_bidder = None
auction_open = True

# Client Management
connected_clients = set()
host_client = None

# stun server configuration (public google stun server)
stun_server = "stun:stun.l.google.com:19302"

# Redis (Optional - for faster bid processing)
# import redis
# r = redis.Redis(host='localhost', port=6379, db=0) #Uncomment if you wish to use redis


# Function to generate a unique ID for each peer connection
def generate_uuid():
    return str(uuid.uuid4())

# SSL Context for secure WebSocket connections
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
certfile_path = os.path.join(os.path.dirname(__file__), "ssl_cert.pem")
keyfile_path = os.path.join(os.path.dirname(__file__), "ssl_key.pem")
ssl_context.load_cert_chain(certfile_path, keyfile_path)


# Function to handle bid validation and processing
async def process_bid(websocket, bid_amount):
    global current_bid, current_bidder

    try:
        bid_amount = float(bid_amount)
        if bid_amount > current_bid:
            current_bid = bid_amount
            current_bidder = websocket
            print(f"New bid: {bid_amount} from {websocket.remote_address}")
            await broadcast_auction_state()
        else:
            await send_message(websocket, {"type": "error", "message": "Bid too low"})
    except ValueError:
        await send_message(websocket, {"type": "error", "message": "Invalid bid amount"})


# Function to handle client-to-host switch requests
async def handle_host_request(websocket):
    global host_client
    if host_client is None:
        host_client = websocket
        print(f"{websocket.remote_address} is now the host.")
        await send_message(websocket, {"type": "host_accepted"})
    else:
        await send_message(websocket, {"type": "error", "message": "Host already exists"})


# Function to send a message to a specific client
async def send_message(websocket, message):
    try:
        await websocket.send(json.dumps(message))
    except websockets.exceptions.ConnectionClosedError:
        print(f"Connection closed while sending message to {websocket.remote_address}")


# Function to broadcast the current auction state to all connected clients
async def broadcast_auction_state():
    message = {
        "type": "auction_state",
        "item": auction_item,
        "current_bid": current_bid,
        "current_bidder": current_bidder.remote_address[0] if current_bidder else None,
        "auction_open": auction_open,
    }
    await broadcast(json.dumps(message))


# Function to broadcast a message to all connected clients
async def broadcast(message):
    if connected_clients:
        await asyncio.wait([client.send(message) for client in connected_clients])


# Asyncio Task to handle each client connection
async def handle_client(websocket, path):
    global connected_clients
    connected_clients.add(websocket)
    print(f"Client connected: {websocket.remote_address}")
    try:
        await send_message(websocket, {"type": "connected", "message": "Connected to the server"})
        await broadcast_auction_state()
        async for message in websocket:
            try:
                data = json.loads(message)
                message_type = data.get("type")

                if message_type == "bid":
                    bid_amount = data.get("amount")
                    await process_bid(websocket, bid_amount)
                elif message_type == "host_request":
                    await handle_host_request(websocket)
                elif message_type == "message":
                    # Handle generic messages (e.g., chat)
                    content = data.get("content")
                    await broadcast(f"Client {websocket.remote_address}: {content}")
                else:
                    await send_message(websocket, {"type": "error", "message": "Invalid message type"})
            except json.JSONDecodeError:
                await send_message(websocket, {"type": "error", "message": "Invalid JSON format"})
            except Exception as e:
                print(f"Error processing message: {e}")
                await send_message(websocket, {"type": "error", "message": "Internal server error"})

    except websockets.exceptions.ConnectionClosedError:
        print(f"Client disconnected unexpectedly: {websocket.remote_address}")
    except websockets.exceptions.ConnectionClosedOK:
        print(f"Client disconnected: {websocket.remote_address}")
    finally:
        connected_clients.remove(websocket)
        if websocket == current_bidder:
            global current_bid, current_bidder
            current_bid = 0
            current_bidder = None
            await broadcast_auction_state()
        print(f"Client disconnected: {websocket.remote_address}")


# Main function to start the WebSocket server
async def main():
    print("Starting WebSocket server...")
    start_server = websockets.serve(handle_client, "localhost", 8765, ssl=ssl_context)  # Using localhost for simplicity
    await start_server


if __name__ == "__main__":
    asyncio.run(main())

In [None]:
##################   Client 

import asyncio
import websockets
import json

async def connect_to_server():
    uri = "wss://localhost:8765"  # Replace with your server's URI
    try:
        async with websockets.connect(uri, ssl=True) as websocket:
            print("Connected to the server")

            async def receive_messages():
                try:
                    async for message in websocket:
                        try:
                            data = json.loads(message)
                            message_type = data.get("type")

                            if message_type == "connected":
                                print(data.get("message"))
                            elif message_type == "auction_state":
                                print("Auction State:")
                                print(f"  Item: {data.get('item')}")
                                print(f"  Current Bid: {data.get('current_bid')}")
                                print(f"  Current Bidder: {data.get('current_bidder')}")
                                print(f"  Auction Open: {data.get('auction_open')}")
                            elif message_type == "error":
                                print(f"Error: {data.get('message')}")
                            elif message_type == "host_accepted":
                                print("You are now the host!")
                            else:
                                print(f"Received: {message}")
                        except json.JSONDecodeError:
                            print(f"Received non-JSON message: {message}")
                except websockets.exceptions.ConnectionClosedError as e:
                    print(f"Connection closed unexpectedly: {e}")
                except Exception as e:
                    print(f"Error receiving messages: {e}")

            receive_task = asyncio.create_task(receive_messages())

            while True:
                user_input = await asyncio.get_event_loop().run_in_executor(None, input, "Enter command (bid <amount>, host_request, message <content>, quit): ")

                if user_input.lower() == "quit":
                    break

                if user_input.startswith("bid "):
                    try:
                        amount = float(user_input[4:])
                        message = {"type": "bid", "amount": amount}
                        await websocket.send(json.dumps(message))
                    except ValueError:
                        print("Invalid bid amount. Please enter a number.")
                elif user_input == "host_request":
                    message = {"type": "host_request"}
                    await websocket.send(json.dumps(message))
                elif user_input.startswith("message "):
                    content = user_input[8:]
                    message = {"type": "message", "content": content}
                    await websocket.send(json.dumps(message))
                else:
                    print("Invalid command.")

            receive_task.cancel()  # Cancel the receiving task before exiting
            await websocket.close()

    except websockets.exceptions.ConnectionRefusedError:
        print("Connection refused. Ensure the server is running.")
    except Exception as e:
        print(f"Connection error: {e}")

if __name__ == "__main__":
    asyncio.run(connect_to_server())


In [None]:
####################################### VIDEO STREAM ########################################
import asyncio
import aiortc
from aiortc import RTCPeerConnection, RTCSessionDescription, RTCIceCandidate
import json
import os
import ssl
import cv2  # You'll need OpenCV
import numpy as np

# Configuration
WIDTH, HEIGHT = 640, 480
FRAME_RATE = 30
USE_FAKE_CAMERA = True  # set to False if you have a real camera connected

# stun server configuration (public google stun server)
stun_server = "stun:stun.l.google.com:19302"

# In-memory offer/answer exchange (replace with a more robust mechanism in a real app)
offer_sdp = None
answer_sdp = None

# SSL Context for secure WebSocket connections (if needed)
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
certfile_path = os.path.join(os.path.dirname(__file__), "ssl_cert.pem")
keyfile_path = os.path.join(os.path.dirname(__file__), "ssl_key.pem")
ssl_context.load_cert_chain(certfile_path, keyfile_path)


class VideoTrack(aiortc.MediaStreamTrack):
    kind = "video"

    def __init__(self):
        super().__init__()  # don't forget this!
        self.frame_interval = 1.0 / FRAME_RATE
        self.last_frame_time = None

    async def recv(self):
        """
        Asynchronously receive a video frame.  If USE_FAKE_CAMERA is true, generates
        a test pattern. Otherwise, attempts to read from the default camera using OpenCV.
        """
        if USE_FAKE_CAMERA:
            # Generate a test pattern (green screen)
            frame = np.zeros((HEIGHT, WIDTH, 3), dtype=np.uint8)
            frame[:, :, 1] = 128  # Green color
        else:
            # Attempt to read from the camera
            ret, frame = camera.read()
            if not ret:
                print("Error reading from camera.  Returning a black frame.")
                frame = np.zeros((HEIGHT, WIDTH, 3), dtype=np.uint8)

        img = frame
        pts, time_base = av.time_base
        new_frame = av.VideoFrame.from_ndarray(img, format="bgr24")
        new_frame.pts = int(time.time() * time_base)
        new_frame.time_base = time_base

        return new_frame


async def run():
    # Create peer connection
    pc = RTCPeerConnection()

    # Add video track
    pc.addTrack(VideoTrack())

    # Create offer
    offer = await pc.createOffer()
    await pc.setLocalDescription(offer)

    print(f"Offer (copy and paste to the client):\n{pc.localDescription.sdp}")
    global offer_sdp
    offer_sdp = pc.localDescription.sdp

    # Wait for answer
    while answer_sdp is None:
        await asyncio.sleep(1)

    print(f"Answer received:\n{answer_sdp}")

    # Set remote description
    await pc.setRemoteDescription(
        RTCSessionDescription(sdp=answer_sdp, type="answer")
    )

    # Start signaling
    async def signaling():
        while True:
            await asyncio.sleep(1)  # Keep the connection alive

    await signaling()
    # Handle signaling
    # try:
    #     await signaling()  # Keep the connection alive
    # except Exception as e:
    #     print(f"Error during signaling: {e}")
    # finally:
    #     # Clean up
    #     await pc.close()


if __name__ == "__main__":
    import av
    import time

    if not USE_FAKE_CAMERA:
        # Initialize the camera (if not using a fake camera)
        camera = cv2.VideoCapture(0)  # 0 is usually the default camera
        camera.set(cv2.CAP_PROP_FRAME_WIDTH, WIDTH)
        camera.set(cv2.CAP_PROP_FRAME_HEIGHT, HEIGHT)
        if not camera.isOpened():
            raise IOError("Cannot open webcam")

    try:
        asyncio.run(run())
    except KeyboardInterrupt:
        pass
    finally:
        print("Exiting")
        if not USE_FAKE_CAMERA:
            camera.release()  # Release the camera



In [None]:
################################### SIgnaling server #####################################
import asyncio
import websockets
import json
import ssl
import os

# In-memory storage for offer and answer
offer_data = {}
answer_data = {}

# SSL Context for secure WebSocket connections
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
certfile_path = os.path.join(os.path.dirname(__file__), "ssl_cert.pem")
keyfile_path = os.path.join(os.path.dirname(__file__), "ssl_key.pem")
ssl_context.load_cert_chain(certfile_path, keyfile_path)

async def signaling(websocket, path):
    try:
        async for message in websocket:
            data = json.loads(message)
            message_type = data.get("type")

            if message_type == "offer":
                offer_sdp = data.get("sdp")
                client_id = data.get("client_id")
                offer_data[client_id] = offer_sdp
                print(f"Received offer from client {client_id}:\n{offer_sdp}")
                await websocket.send(json.dumps({"type": "info", "message": "Offer Received"}))
            elif message_type == "answer":
                answer_sdp = data.get("sdp")
                client_id = data.get("client_id")
                answer_data[client_id] = answer_sdp
                print(f"Received answer from client {client_id}:\n{answer_sdp}")
                await websocket.send(json.dumps({"type": "info", "message": "Answer Received"}))
            elif message_type == "get_offer":
                client_id = data.get("client_id")
                sdp = offer_data.get(client_id)
                if sdp:
                    await websocket.send(json.dumps({"type": "offer", "sdp": sdp}))
                else:
                    await websocket.send(json.dumps({"type": "error", "message": "Offer not found"}))
            elif message_type == "get_answer":
                client_id = data.get("client_id")
                sdp = answer_data.get(client_id)
                if sdp:
                    await websocket.send(json.dumps({"type": "answer", "sdp": sdp}))
                else:
                    await websocket.send(json.dumps({"type": "error", "message": "Answer not found"}))
            else:
                print(f"Received unknown message type: {message_type}")

    except websockets.exceptions.ConnectionClosedError:
        print("Connection closed unexpectedly")
    except websockets.exceptions.ConnectionClosedOK:
        print("Connection closed")

async def main():
    print("Starting signaling server...")
    start_server = websockets.serve(signaling, "localhost", 8766, ssl=ssl_context)
    await start_server

if __name__ == "__main__":
    asyncio.run(main())


In [None]:
########################### video client ###################################


import asyncio
import aiortc
from aiortc import RTCPeerConnection, RTCSessionDescription
import json
import ssl
import os
import uuid
import websockets

# Configuration
signaling_server_url = "wss://localhost:8766"
client_id = str(uuid.uuid4())  # Generate a unique client ID
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
certfile_path = os.path.join(os.path.dirname(__file__), "ssl_cert.pem")
keyfile_path = os.path.join(os.path.dirname(__file__), "ssl_key.pem")
ssl_context.load_cert_chain(certfile_path, keyfile_path)
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE

# Global variable to hold the signaling websocket
signaling_ws = None

async def connect_signaling_server():
    global signaling_ws
    try:
        signaling_ws = await websockets.connect(signaling_server_url, ssl=ssl_context)
        print("Connected to signaling server")
        return signaling_ws
    except Exception as e:
        print(f"Error connecting to signaling server: {e}")
        return None

async def send_to_signaling_server(message):
    global signaling_ws
    if signaling_ws and signaling_ws.open:
        await signaling_ws.send(json.dumps(message))
    else:
        print("Not connected to signaling server, cannot send message")

async def receive_from_signaling_server():
    global signaling_ws
    try:
        async for message in signaling_ws:
            data = json.loads(message)
            message_type = data.get("type")
            return data
    except Exception as e:
        print(f"Error receiving from signaling server: {e}")
        return None

async def run():
    pc = RTCPeerConnection()

    @pc.on("track")
    async def on_track(track):
        print(f"Track {track.kind} received")
        if track.kind == "video":
            # Here, you would typically render the video to a UI element.
            # Since this is a console application, we'll just print some info.
            print("Receiving video - rendering would happen here.")
            while True:
                frame = await track.recv()
                print(f"Received frame with timestamp: {frame.pts}")
                await asyncio.sleep(0.1)

    # Connect to the signaling server
    signaling_ws = await connect_signaling_server()

    # Get offer from the signaling server
    await send_to_signaling_server({"type": "get_offer", "client_id": client_id})
    offer_data = await receive_from_signaling_server()
    offer_sdp = offer_data.get("sdp")
    print(f"offer_data {offer_data}")

    # Set remote description
    await pc.setRemoteDescription(
        RTCSessionDescription(sdp=offer_sdp, type="offer")
    )
