In [None]:
import asyncio
import websockets
import aiohttp
import json
import wave
import numpy as np
from datetime import datetime
import logging
from typing import Optional, Callable

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class AudioServerClient:
    def __init__(self, host: str = "localhost", port: int = 8005):
        """Initialize the client SDK for the audio server.
        
        Args:
            host (str): Server hostname
            port (int): Server port
        """
        self.base_url = f"http://{host}:{port}"
        self.ws_url = f"ws://{host}:{port}/ws/audio"
        self.websocket: Optional[websockets.WebSocketClientProtocol] = None
        self.is_recording = False
        self.on_audio_level: Optional[Callable[[int], None]] = None
        self.on_recording_complete: Optional[Callable[[str], None]] = None
        
    async def connect(self):
        """Establish WebSocket connection with the server."""
        try:
            self.websocket = await websockets.connect(self.ws_url)
            logger.info("WebSocket connection established")
            # Start listening for messages
            asyncio.create_task(self._listen_for_messages())
            return True
        except Exception as e:
            logger.error(f"Failed to connect: {e}")
            return False

    async def _listen_for_messages(self):
        """Listen for incoming WebSocket messages."""
        while True:
            try:
                if self.websocket:
                    message = await self.websocket.recv()
                    if isinstance(message, str):
                        await self._handle_text_message(message)
                    else:
                        await self._handle_binary_message(message)
            except websockets.exceptions.ConnectionClosed:
                logger.info("WebSocket connection closed")
                break
            except Exception as e:
                logger.error(f"Error in message listener: {e}")
                break

    async def _handle_text_message(self, message: str):
        """Handle incoming text messages."""
        try:
            data = json.loads(message)
            if data.get("type") == "audio_level":
                if self.on_audio_level:
                    self.on_audio_level(data["level"])
            elif data.get("status") == "success" and "filename" in data:
                if self.on_recording_complete:
                    self.on_recording_complete(data["filename"])
        except json.JSONDecodeError:
            logger.error("Failed to parse message as JSON")

    async def _handle_binary_message(self, message: bytes):
        """Handle incoming binary messages."""
        # Implement if server sends binary data
        pass

    async def start_recording(self):
        """Start audio recording."""
        if self.websocket:
            await self.websocket.send(json.dumps({
                "type": "command",
                "command": "start_recording"
            }))
            self.is_recording = True
            logger.info("Recording started")

    async def stop_recording(self):
        """Stop audio recording."""
        if self.websocket:
            await self.websocket.send(json.dumps({
                "type": "command",
                "command": "stop_recording"
            }))
            self.is_recording = False
            logger.info("Recording stopped")

    async def send_audio_data(self, audio_data: bytes):
        """Send audio data to the server."""
        if self.websocket:
            await self.websocket.send(audio_data)

    async def start_complex_chat(self):
        """Start complex chat mode."""
        if self.websocket:
            await self.websocket.send(json.dumps({
                "type": "command",
                "command": "lets_chat"
            }))
            logger.info("Complex chat mode activated")

    async def stop_complex_chat(self):
        """Stop complex chat mode."""
        if self.websocket:
            await self.websocket.send(json.dumps({
                "type": "command",
                "command": "stop_chat"
            }))
            logger.info("Complex chat mode deactivated")

    async def upload_image(self, image_path: str):
        """Upload an image to the server."""
        async with aiohttp.ClientSession() as session:
            with open(image_path, 'rb') as f:
                data = aiohttp.FormData()
                data.add_field('file',
                             f,
                             filename=image_path.split('/')[-1],
                             content_type='image/jpeg')
                
                async with session.post(f"{self.base_url}/upload", data=data) as response:
                    result = await response.json()
                    logger.info(f"Image upload result: {result}")
                    return result

    async def check_health(self):
        """Check server health status."""
        async with aiohttp.ClientSession() as session:
            async with session.get(f"{self.base_url}/health") as response:
                return await response.json()

    async def close(self):
        """Close the WebSocket connection."""
        if self.websocket:
            await self.websocket.close()
            logger.info("Connection closed")

# Example usage and test code
async def test_client():
    # Initialize client
    client = AudioServerClient()
    
    # Connect to server
    connected = await client.connect()
    if not connected:
        logger.error("Failed to connect to server")
        return

    # Set up callbacks
    def on_audio_level(level: int):
        logger.info(f"Audio level: {level}")

    def on_recording_complete(filename: str):
        logger.info(f"Recording saved as: {filename}")

    client.on_audio_level = on_audio_level
    client.on_recording_complete = on_recording_complete

    try:
        # Check server health
        health = await client.check_health()
        logger.info(f"Server health: {health}")

        # Start complex chat mode
        await client.start_complex_chat()
        
        # Start recording
        await client.start_recording()

        # Simulate sending audio data
        # Generate 5 seconds of sample audio data
        sample_rate = 16000
        duration = 5  # seconds
        t = np.linspace(0, duration, int(sample_rate * duration))
        audio_data = (np.sin(2 * np.pi * 440 * t) * 32767).astype(np.int16)
        
        # Send audio data in chunks
        chunk_size = 1024
        for i in range(0, len(audio_data), chunk_size):
            chunk = audio_data[i:i + chunk_size]
            await client.send_audio_data(chunk.tobytes())
            await asyncio.sleep(0.01)  # Small delay to prevent overwhelming the server

        # Stop recording
        await client.stop_recording()

        # Stop complex chat mode
        await client.stop_complex_chat()

        # Wait a bit for any final messages
        await asyncio.sleep(2)

    finally:
        # Close connection
        await client.close()

if __name__ == "__main__":
    # Run the test
    asyncio.run(test_client())

In [None]:
import streamlit as st
import asyncio
from client_sdk import AudioServerClient
import sounddevice as sd
import numpy as np
import queue
import threading
from datetime import datetime
import time
import os
from PIL import Image

# Initialize session state
if 'client' not in st.session_state:
    st.session_state.client = None
if 'recording' not in st.session_state:
    st.session_state.recording = False
if 'complex_mode' not in st.session_state:
    st.session_state.complex_mode = False
if 'audio_queue' not in st.session_state:
    st.session_state.audio_queue = queue.Queue()
if 'audio_level' not in st.session_state:
    st.session_state.audio_level = 0

st.title("🎤 Audio Server Test Interface")

# Server Connection Settings
st.sidebar.header("Server Settings")
host = st.sidebar.text_input("Server Host", "localhost")
port = st.sidebar.number_input("Server Port", value=8005, min_value=1, max_value=65535)

async def connect_to_server():
    if st.session_state.client:
        await st.session_state.client.close()
    
    client = AudioServerClient(host=host, port=port)
    connected = await client.connect()
    
    if connected:
        st.session_state.client = client
        return True
    return False

def audio_callback(indata, frames, time, status):
    """Callback for sounddevice to handle audio input"""
    if status:
        print(status)
    if st.session_state.recording:
        st.session_state.audio_queue.put(indata.copy())

# Connection status and control
col1, col2 = st.columns(2)
with col1:
    if st.button("Connect to Server"):
        with st.spinner("Connecting..."):
            if asyncio.run(connect_to_server()):
                st.success("Connected!")
            else:
                st.error("Connection failed!")

with col2:
    if st.button("Check Server Health"):
        if st.session_state.client:
            with st.spinner("Checking..."):
                health = asyncio.run(st.session_state.client.check_health())
                st.json(health)
        else:
            st.warning("Please connect to server first")

# Audio Recording Section
st.header("🎙️ Audio Recording")

col3, col4 = st.columns(2)
with col3:
    if st.button("Start Recording" if not st.session_state.recording else "Stop Recording"):
        if st.session_state.client:
            if not st.session_state.recording:
                # Start recording
                st.session_state.recording = True
                asyncio.run(st.session_state.client.start_recording())
                
                # Start audio input stream
                stream = sd.InputStream(
                    channels=1,
                    samplerate=16000,
                    callback=audio_callback
                )
                stream.start()
                st.session_state.stream = stream
            else:
                # Stop recording
                st.session_state.recording = False
                st.session_state.stream.stop()
                st.session_state.stream.close()
                asyncio.run(st.session_state.client.stop_recording())
        else:
            st.warning("Please connect to server first")

with col4:
    # Audio level meter
    st.metric("Audio Level", f"{st.session_state.audio_level} dB")

# Complex Chat Mode
st.header("💭 Complex Chat Mode")
col5, col6 = st.columns(2)
with col5:
    if st.button("Start Complex Chat" if not st.session_state.complex_mode else "Stop Complex Chat"):
        if st.session_state.client:
            if not st.session_state.complex_mode:
                asyncio.run(st.session_state.client.start_complex_chat())
                st.session_state.complex_mode = True
            else:
                asyncio.run(st.session_state.client.stop_complex_chat())
                st.session_state.complex_mode = False
        else:
            st.warning("Please connect to server first")

# Image Upload Section
st.header("📸 Image Upload")
uploaded_file = st.file_uploader("Choose an image file", type=['jpg', 'jpeg', 'png'])
if uploaded_file is not None:
    # Display the uploaded image
    image = Image.open(uploaded_file)
    st.image(image, caption='Uploaded Image', use_column_width=True)
    
    # Save temporarily and upload
    if st.button("Upload Image"):
        if st.session_state.client:
            with st.spinner("Uploading..."):
                # Save temporary file
                temp_path = f"temp_{int(time.time())}.jpg"
                image.save(temp_path)
                
                # Upload
                try:
                    result = asyncio.run(st.session_state.client.upload_image(temp_path))
                    st.json(result)
                finally:
                    # Clean up temp file
                    if os.path.exists(temp_path):
                        os.remove(temp_path)
        else:
            st.warning("Please connect to server first")

# Audio Visualization
st.header("📊 Audio Visualization")
chart = st.empty()

# Background task to update audio visualization
def update_audio_viz():
    while True:
        try:
            if st.session_state.recording and not st.session_state.audio_queue.empty():
                audio_data = st.session_state.audio_queue.get()
                level = int(np.abs(audio_data).mean() * 1000)
                st.session_state.audio_level = level
                
                if st.session_state.client:
                    asyncio.run(st.session_state.client.send_audio_data(audio_data.tobytes()))
        except Exception as e:
            print(f"Error in audio visualization: {e}")
        time.sleep(0.1)

# Start background thread for audio visualization
viz_thread = threading.Thread(target=update_audio_viz, daemon=True)
viz_thread.start()

# Status and Logs Section
st.header("📝 Status and Logs")
log_container = st.empty()

def update_log(message):
    current_log = log_container.text_area("Activity Log", height=200, value=message)

# Cleanup on app close
def cleanup():
    if st.session_state.client:
        asyncio.run(st.session_state.client.close())
    if hasattr(st.session_state, 'stream'):
        st.session_state.stream.stop()
        st.session_state.stream.close()

# Register cleanup
import atexit
atexit.register(cleanup)