In [1]:
import threading
import cv2
import mediapipe as mp
import socket
import json

# Define a flag to control when to run the extraction function
run_extraction = False

# Function to extract wrist coordinates using MediaPipe
def extract_wrist_coordinates(client_socket):
    mp_drawing = mp.solutions.drawing_utils
    mp_hands = mp.solutions.hands

    cap = cv2.VideoCapture(1)
    hands = mp_hands.Hands()

    while run_extraction:
        try:
            ret, frame = cap.read()
            if not ret:
                break

            frame = cv2.flip(frame, 1)
            rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            results = hands.process(rgb_frame)

            if results.multi_hand_landmarks:
                for landmarks in results.multi_hand_landmarks:
                    wrist_landmark = landmarks.landmark[mp_hands.HandLandmark.WRIST]
                    wrist_coords = {'x': wrist_landmark.x, 'y': wrist_landmark.y, 'z': wrist_landmark.z}
                    print(wrist_coords)
                    json_coords = json.dumps(wrist_coords)
                    client_socket.send((json_coords+ '\n').encode())

            cv2.imshow('MediaPipe Hands', frame)
            if cv2.waitKey(1) == 27:
                break
        except Exception as e:
            print("Error:", e)
            break

    # Release the camera and close OpenCV windows when done
    cap.release()
    cv2.destroyAllWindows()

# Function to start the server socket
def server_socket():
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind(('localhost', 12345))
    server.listen(1)
    print("Server is listening on port 12345...")

    while True:
        client_socket, addr = server.accept()
        print(f"Connection from {addr[0]}:{addr[1]}")
        
        global run_extraction  # Use the global flag
        
        # Set the flag to True when the connection is open
        run_extraction = True

        # Start the hand thread after the client connects
        hand_thread = threading.Thread(target=extract_wrist_coordinates, args=(client_socket,))
        hand_thread.start()

        # Wait for the hand_thread to finish (connection terminated)
        hand_thread.join()

        # Reset the flag to False when the connection is terminated
        run_extraction = False

# Create and start the socket thread
socket_thread = threading.Thread(target=server_socket)
socket_thread.start()

socket_thread.join()


Server is listening on port 12345...


In [None]:
import socket

def close_socket_by_port(port):
    try:
        # Create a socket and connect to the localhost on the specified port
        client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        client_socket.connect(('localhost', port))

        # Close the socket connection
        client_socket.close()
        print(f"Socket connection on port {port} closed successfully.")
    except ConnectionRefusedError:
        print(f"No socket connection found on port {port}.")

# Example usage:
# Close a socket connection on port 12345
close_socket_by_port(12345)
