In [None]:
import socket
import cv2
import numpy as np
import threading
import logging
import time
import atexit
from flask import Flask, Response
from collections import deque
from concurrent.futures import ThreadPoolExecutor
import io
from typing import Optional

# --- Configuration ---
TCP_HOST = '0.0.0.0'
TCP_PORT = 8000
FLASK_PORT = 8001
NUM_CAMERAS = 2
QUEUE_SIZE = 5
JPEG_QUALITY = 75
MAX_FRAME_SIZE = 15 * 1024 * 1024
MJPEG_SLEEP_TIME_IDLE = 0.1
MJPEG_SLEEP_TIME_ACTIVE = 0.01
SOCKET_TIMEOUT = 10

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(threadName)s - %(levelname)s - %(message)s'
)

# --- Global Data Structures ---
deq_merged = deque(maxlen=QUEUE_SIZE)
deq_split = [deque(maxlen=QUEUE_SIZE) for _ in range(NUM_CAMERAS)]
EMPTY_FRAME_PLACEHOLDER = b''
executor = ThreadPoolExecutor(max_workers=NUM_CAMERAS + 2, thread_name_prefix='FrameProcessor')


def recv_all(sock: socket.socket, n: int) -> Optional[bytes]:
    data = bytearray()
    while len(data) < n:
        try:
            packet = sock.recv(n - len(data))
            if not packet:
                logging.warning(f"Connection closed while trying to receive {n} bytes (got {len(data)}).")
                return None
            data.extend(packet)
        except socket.timeout:
            logging.warning(f"Socket timeout receiving {n} bytes (got {len(data)}).")
            return None
        except OSError as e:
            logging.error(f"Socket error receiving {n} bytes (got {len(data)}): {e}")
            return None
    return bytes(data)


def handle_client(conn: socket.socket, addr: tuple):
    logging.info(f"Connected to {addr}")
    conn.settimeout(SOCKET_TIMEOUT)
    try:
        while True:
            length_bytes = recv_all(conn, 4)
            if length_bytes is None:
                # logging.info(f"Client {addr} disconnected or failed to send length.")
                break

            try:
                length = int.from_bytes(length_bytes, byteorder='big')
            except Exception:
                # logging.error(f"Failed to decode length from {addr}. Data: {length_bytes!r}")
                break

            if length <= 0 or length > MAX_FRAME_SIZE:
                # logging.warning(f"Invalid frame length received from {addr}: {length}. Max: {MAX_FRAME_SIZE}. Disconnecting.")
                break

            frame_data = recv_all(conn, length)
            if frame_data is None:
                # logging.warning(f"Incomplete frame data from {addr}. Expected {length} bytes.")
                break

            executor.submit(process_frame, frame_data, addr)

    except socket.timeout:
         logging.warning(f"Socket timeout for client {addr}.")
    except ConnectionResetError:
         logging.warning(f"Connection reset by peer {addr}")
    except Exception as e:
        logging.error(f"Unexpected error with client {addr}: {e}", exc_info=False)
    finally:
        conn.close()
        logging.info(f"Disconnected from {addr}")


def process_frame(frame_bytes: bytes, addr: tuple):
    original_frame_added_to_merged = False
    try:
        deq_merged.append(frame_bytes)
        original_frame_added_to_merged = True

        frame_np = np.frombuffer(frame_bytes, dtype=np.uint8)
        merged_frame = cv2.imdecode(frame_np, cv2.IMREAD_COLOR)

        if merged_frame is None:
            logging.error(f"Failed to decode frame from {addr}. Size: {len(frame_bytes)} bytes.")
            if original_frame_added_to_merged and deq_merged and deq_merged[-1] is frame_bytes:
                try:
                    deq_merged.pop()
                except IndexError: pass
            return

        height, width, channels = merged_frame.shape
        if channels != 3:
             # logging.warning(f"Frame from {addr} has {channels} channels, expected 3.")
             return

        frame_width = width // NUM_CAMERAS

        if frame_width <= 0:
             # logging.error(f"Calculated frame_width <= 0 ({frame_width}) for merged width {width}.")
             return

        encode_param = [cv2.IMWRITE_JPEG_QUALITY, JPEG_QUALITY]

        for i in range(NUM_CAMERAS):
            start_col = i * frame_width
            end_col = (i + 1) * frame_width if i < NUM_CAMERAS - 1 else width
            split_frame_slice = merged_frame[:, start_col:end_col]

            if split_frame_slice.size > 0 and split_frame_slice.shape[1] > 0:
                success, split_jpg_data = cv2.imencode('.jpg', split_frame_slice, encode_param)
                if success:
                    deq_split[i].append(split_jpg_data.tobytes())
                else:
                    deq_split[i].append(EMPTY_FRAME_PLACEHOLDER)
                    # logging.error(f"Failed to re-encode split frame {i} from {addr}.")
            else:
                 deq_split[i].append(EMPTY_FRAME_PLACEHOLDER)
                 # logging.warning(f"Invalid slice for split frame {i} from {addr}. Shape: {split_frame_slice.shape}")

    except cv2.error as e:
        logging.error(f"OpenCV error processing frame from {addr}: {e}")
        if original_frame_added_to_merged and deq_merged and deq_merged[-1] is frame_bytes:
             try:
                 deq_merged.pop()
             except IndexError: pass
    except Exception as e:
        logging.error(f"Error processing frame from {addr}: {e}", exc_info=False)
        if original_frame_added_to_merged and deq_merged and deq_merged[-1] is frame_bytes:
             try:
                 deq_merged.pop()
             except IndexError: pass


def tcp_server():
    try:
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket:
            server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            server_socket.bind((TCP_HOST, TCP_PORT))
            server_socket.listen(5)
            logging.info(f"TCP server listening on {TCP_HOST}:{TCP_PORT}")

            while True:
                try:
                    conn, addr = server_socket.accept()
                    client_thread = threading.Thread(
                        target=handle_client,
                        args=(conn, addr),
                        daemon=True,
                        name=f"Client-{addr[0]}:{addr[1]}"
                    )
                    client_thread.start()
                except Exception as e:
                    logging.error(f"Error accepting connection: {e}", exc_info=False)
                    time.sleep(1)

    except Exception as e:
        logging.critical(f"TCP server failed to start or crashed: {e}", exc_info=True)


def generate_mjpeg(frame_deque: deque):
    last_frame_hash = None
    while True:
        frame_to_yield = None
        try:
            if frame_deque:
                current_latest_frame = frame_deque[-1]
                if current_latest_frame != EMPTY_FRAME_PLACEHOLDER:
                    current_hash = hash(current_latest_frame)
                    if current_hash != last_frame_hash:
                       frame_to_yield = current_latest_frame
                       last_frame_hash = current_hash

        except IndexError:
            pass
        except Exception as e:
             logging.error(f"Error accessing frame deque for MJPEG: {e}", exc_info=False)

        if frame_to_yield:
            try:
                yield (b'--frame\r\n'
                       b'Content-Type: image/jpeg\r\n\r\n' + frame_to_yield + b'\r\n')
                time.sleep(MJPEG_SLEEP_TIME_ACTIVE)
            except Exception as e:
                 # logging.warning(f"Error yielding MJPEG frame: {e}")
                 break
        else:
            time.sleep(MJPEG_SLEEP_TIME_IDLE)


# --- Flask Web Server ---
app = Flask(__name__)
log = logging.getLogger('werkzeug')
log.setLevel(logging.WARNING)
app.logger.setLevel(logging.WARNING)


@app.route('/')
def index():
    links = '<h1>Video Streams</h1>'
    links += '<p><a href="/merged_frame" target="_blank">Merged Stream</a></p>'
    for i in range(NUM_CAMERAS):
        links += f'<p><a href="/split_frame/{i}" target="_blank">Camera {i} Stream</a></p>'
    return links

@app.route('/merged_frame')
def merged_frame_feed():
    return Response(generate_mjpeg(deq_merged),
                    mimetype='multipart/x-mixed-replace; boundary=frame')

@app.route('/split_frame/<int:camera_id>')
def split_frame_feed(camera_id):
    if 0 <= camera_id < NUM_CAMERAS:
        return Response(generate_mjpeg(deq_split[camera_id]),
                        mimetype='multipart/x-mixed-replace; boundary=frame')
    else:
        # logging.warning(f"Invalid camera ID requested: {camera_id}")
        return "Invalid camera ID", 404


def shutdown_resources():
    logging.info("Initiating shutdown...")
    executor.shutdown(wait=True, cancel_futures=False)
    logging.info("Thread pool executor shut down.")


if __name__ == "__main__":
    atexit.register(shutdown_resources)

    tcp_thread = threading.Thread(target=tcp_server, daemon=True, name="TCPServerThread")
    tcp_thread.start()

    logging.info(f"Flask server starting on http://{TCP_HOST}:{FLASK_PORT} (or http://127.0.0.1:{FLASK_PORT})")
    try:
        app.run(host='0.0.0.0', port=FLASK_PORT, debug=False, threaded=True)
    except Exception as e:
        logging.critical(f"Flask server failed to start: {e}", exc_info=True)

    logging.info("Flask server stopped.")