In [1]:
from IPython.display import clear_output, display, Audio
from ipywebrtc import CameraStream, AudioRecorder
import matplotlib.pyplot as plt
from collections import deque
import numpy as np
import threading
import time
import torch
import io

model, utils = torch.hub.load(
    repo_or_dir='snakers4/silero-vad',
    model='silero_vad',
    force_reload=False
)

(get_speech_timestamps,
 save_audio,
 read_audio,
 VADIterator,
 collect_chunks) = utils

class AudioStream:
    def __init__(self, chunk_ms=100):
        self.stream = CameraStream(constraints={
            'audio': {
                'echoCancellation': True,
                'noiseSuppression': True,
                'sampleRate': 16000
            },
            'video': False
        })
        self.recorder = AudioRecorder(stream=self.stream, autosave=False)
        self.recorder.audio.observe(self.on_audio_recorded, names='value')
        self.audio_buffer = deque()
        self.buffer_lock = threading.Lock()  # Thread safety
        self.chunk_ms = chunk_ms
        self.running = False
        
    def control(self):
        display(self.stream)
        display(self.recorder)
    
    def on_audio_recorded(self, change):
        try:
            audio_bytes = io.BytesIO(change['new']).getvalue()
            if audio_bytes != b'':
                audio = read_audio(audio_bytes, sampling_rate=16000)
                with self.buffer_lock:  # Thread-safe append
                    self.audio_buffer.append(audio)
        except Exception as e:
            print(f"Error recording audio: {e}")

    def start(self):
        self.running = True
        def record_loop():
            while self.running:
                self.recorder.recording = True
                time.sleep(self.chunk_ms / 1000)
                self.recorder.recording = False
        self.thread = threading.Thread(target=record_loop, daemon=True)
        self.thread.start()

    def stop(self):
        self.running = False
        if self.thread:
            self.thread.join(timeout=1)

    def show(self):
        while True:
            with self.buffer_lock:  # Thread-safe read
                if len(self.audio_buffer) > 0:
                    clear_output(wait=True)
                    buffer = torch.cat(list(self.audio_buffer[-int(1000/self.chunk_ms * 5):]))
                    plt.figure(figsize=(10, 3))
                    plt.plot(buffer)
                    plt.show()
                    plt.close()
            time.sleep(0.1)

stream = AudioStream(chunk_ms=500)
stream.start()

Using cache found in /root/.cache/torch/hub/snakers4_silero-vad_master


In [None]:
class VoiceAgent:
    def __init__(self, audio_buffer, buffer_lock, vad_thresh: float = 0.1):
        self.audio_buffer = audio_buffer
        self.buffer_lock = buffer_lock  # Store lock reference
        self.is_responding = True
        self.user_speaking = False
        self.last_response = 0
        self.logs = []
        
        # Use global model instead of reloading
        self.vad = model
        self.vad_thresh = vad_thresh

    def update_state(self, conf):
        if conf > self.vad_thresh:
            if self.is_responding:
                self.user_speaking = True
                self.is_responding = False
                print(f"Abort all response and listen")
                self.last_response = len(self.logs)
        else:
            if self.user_speaking:
                self.user_speaking = False
                self.is_responding = True
                print(f"Generate Response now")
                # display(Audio(torch.cat(self.logs[self.last_response:-1]), rate=16000))

    def infer_frame(self, frame):
        assert len(frame) == 512
        conf = self.vad(frame, 16000).item()
        self.logs.append(frame)
        self.update_state(conf)
    
    def run(self):
        frame = torch.Tensor([])
        
        while True:
            # Thread-safe buffer access
            with self.buffer_lock:
                if len(self.audio_buffer) == 0:
                    chunk = None
                else:
                    chunk = self.audio_buffer.popleft()
            
            # Sleep if no data to avoid busy-waiting
            if chunk is None:
                time.sleep(0.01)
                continue
            
            # Process any complete frames we already have
            while len(frame) >= 512:
                self.infer_frame(frame[:512])
                frame = frame[512:]
            
            # Add new chunk data to frame
            frame = torch.cat([frame, chunk]) if len(frame) > 0 else chunk
            
            # Process all complete frames from combined data
            while len(frame) >= 512:
                self.infer_frame(frame[:512])
                frame = frame[512:]

agent = VoiceAgent(stream.audio_buffer, stream.buffer_lock)
agent.run()

Abort all response and listen
Generate Response now
Abort all response and listen
Generate Response now
Abort all response and listen
Generate Response now
Abort all response and listen
Generate Response now
Abort all response and listen
Generate Response now
Abort all response and listen
Generate Response now
Abort all response and listen
Generate Response now
Abort all response and listen
Generate Response now
Abort all response and listen
Generate Response now
Abort all response and listen
Generate Response now
Abort all response and listen
Generate Response now
Abort all response and listen
Generate Response now
Abort all response and listen
Generate Response now
Abort all response and listen
Generate Response now
Abort all response and listen
Generate Response now
Abort all response and listen
Generate Response now
Abort all response and listen
Generate Response now
Abort all response and listen
Generate Response now


## Qwen3 Omni vLLM

In [None]:
from IPython.display import clear_output, display, Audio
from ipywebrtc import CameraStream, AudioRecorder
import matplotlib.pyplot as plt
from collections import deque
import numpy as np
import threading
import time
import torch
import io
import os
import tempfile
import torchaudio
import asyncio
from ipywidgets import Output
from queue import Queue

# VAD Model
model, utils = torch.hub.load(
    repo_or_dir='snakers4/silero-vad',
    model='silero_vad',
    force_reload=False
)

(get_speech_timestamps,
 save_audio,
 read_audio,
 VADIterator,
 collect_chunks) = utils

# LLM Setup
os.environ['VLLM_USE_V1'] = '0'
os.environ['CUDA_VISIBLE_DEVICES'] = '0'

from vllm import AsyncLLMEngine, AsyncEngineArgs, SamplingParams
from transformers import Qwen3OmniMoeProcessor

MODEL_PATH = "Qwen/Qwen3-Omni-30B-A3B-Instruct"
# MODEL_PATH = "Qwen/Qwen3-Omni-30B-A3B-Thinking"

# Create AsyncLLMEngine
engine_args = AsyncEngineArgs(
    model=MODEL_PATH,
    trust_remote_code=True,
    gpu_memory_utilization=0.95,
    limit_mm_per_prompt={'image': 0, 'video': 0, 'audio': 1},
    max_num_seqs=64,
    max_model_len=16384,
    dtype=torch.float16,
    enable_chunked_prefill=True
)

llm = AsyncLLMEngine.from_engine_args(engine_args)
processor = Qwen3OmniMoeProcessor.from_pretrained(MODEL_PATH)

class AudioStream:
    def __init__(self, chunk_ms=100):
        self.stream = CameraStream(constraints={
            'audio': {
                'echoCancellation': True,
                'noiseSuppression': True,
                'sampleRate': 16000
            },
            'video': False
        })
        self.recorder = AudioRecorder(stream=self.stream, autosave=False)
        self.recorder.audio.observe(self.on_audio_recorded, names='value')
        self.audio_buffer = deque()
        self.buffer_lock = threading.Lock()  # Thread safety
        self.chunk_ms = chunk_ms
        self.running = False
        
    def control(self):
        display(self.stream)
        display(self.recorder)
    
    def on_audio_recorded(self, change):
        try:
            audio_data = change['new']
            if audio_data and len(audio_data) > 0:
                # Pass BytesIO object directly to read_audio, not raw bytes
                audio_io = io.BytesIO(audio_data)
                audio = read_audio(audio_io, sampling_rate=16000)
                with self.buffer_lock:  # Thread-safe append
                    self.audio_buffer.append(audio)
        except Exception as e:
            # Suppress common empty audio errors
            if "End of file" not in str(e) and "incompatible function arguments" not in str(e):
                print(f"Error recording audio: {e}")

    def start(self):
        self.running = True
        def record_loop():
            while self.running:
                self.recorder.recording = True
                time.sleep(self.chunk_ms / 1000)
                self.recorder.recording = False
        self.thread = threading.Thread(target=record_loop, daemon=True)
        self.thread.start()

    def stop(self):
        self.running = False
        if self.thread:
            self.thread.join(timeout=1)

    def show(self):
        while True:
            with self.buffer_lock:  # Thread-safe read
                if len(self.audio_buffer) > 0:
                    clear_output(wait=True)
                    buffer = torch.cat(list(self.audio_buffer[-int(1000/self.chunk_ms * 5):]))
                    plt.figure(figsize=(10, 3))
                    plt.plot(buffer)
                    plt.show()
                    plt.close()
            time.sleep(0.1)

stream = AudioStream(chunk_ms=500)
stream.start()

class VoiceAgent:
    def __init__(self, audio_buffer, buffer_lock, llm, processor, vad_thresh: float = 0.1):
        self.audio_buffer = audio_buffer
        self.buffer_lock = buffer_lock  # Store lock reference
        self.is_responding = True
        self.user_speaking = False
        self.last_response = 0
        self.logs = []
        self.generating = False  # Track if LLM is generating
        
        # Use global model instead of reloading
        self.vad = model
        self.vad_thresh = vad_thresh
        
        # LLM components
        self.llm = llm
        self.processor = processor
        
        # Create event loop for async operations
        self.loop = None
        
        # Output widget for Jupyter display
        self.output_widget = Output()
        display(self.output_widget)
        
        # Queue for streaming tokens
        self.token_queue = Queue()

    def update_state(self, conf):
        if conf > self.vad_thresh:
            if self.is_responding:
                self.user_speaking = True
                self.is_responding = False
                with self.output_widget:
                    print(f"ðŸŽ¤ User speaking - listening...")
                self.last_response = len(self.logs)
        else:
            if self.user_speaking and not self.generating:
                self.user_speaking = False
                self.is_responding = True
                with self.output_widget:
                    print(f"ðŸ¤– Generating response...")
                # Run generation in background thread with async
                threading.Thread(target=self._run_async_generation, daemon=True).start()
    
    def _run_async_generation(self):
        """Run async generation in a new event loop"""
        # Create new event loop for this thread
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        try:
            loop.run_until_complete(self._generate_response())
        finally:
            loop.close()
    
    async def _generate_response(self):
        """Generate LLM response from user audio with streaming"""
        try:
            self.generating = True
            
            with self.output_widget:
                print("[DEBUG] Starting generation...")
            
            # Get user audio from logs
            if self.last_response >= len(self.logs):
                with self.output_widget:
                    print("No audio to process")
                return
            
            user_audio = torch.cat(self.logs[self.last_response:])
            
            with self.output_widget:
                print(f"[DEBUG] User audio shape: {user_audio.shape}")
            
            # Save audio to temporary file for processing
            with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as tmp_file:
                tmp_path = tmp_file.name
                torchaudio.save(tmp_path, user_audio.unsqueeze(0), 16000)
            
            with self.output_widget:
                print(f"[DEBUG] Audio saved to: {tmp_path}")
            
            # Prepare messages
            messages = [
                {
                    "role": "user",
                    "content": [
                        {"type": "text", "text": "Respond to this audio naturally and concisely"},
                        {"type": "audio", "audio": tmp_path},
                    ], 
                }
            ]
            
            # Process with vLLM
            sampling_params = SamplingParams(
                temperature=0.6,
                top_p=0.95,
                top_k=20,
                max_tokens=512,
            )
            
            text = self.processor.apply_chat_template(
                messages,
                tokenize=False,
                add_generation_prompt=True,
            )
            
            inputs = {
                'prompt': text,
                'multi_modal_data': {
                    "audio": [user_audio]
                },
                "mm_processor_kwargs": {
                    "use_audio_in_video": True,
                },
            }
            
            # Generate unique request ID
            request_id = f"request-{time.time()}"
            
            # Stream outputs asynchronously
            with self.output_widget:
                print("[DEBUG] Starting stream generation...")
                print("Assistant: ", end="", flush=True)
            
            full_response = ""
            
            # Use async generator to stream tokens - AsyncLLMEngine uses different API
            with self.output_widget:
                print("[DEBUG] Calling llm.generate()...")
            
            results_generator = self.llm.generate(
                prompt=text,
                sampling_params=sampling_params,
                request_id=request_id,
                multi_modal_data={
                    "audio": [user_audio]
                },
                mm_processor_kwargs={
                    "use_audio_in_video": True,
                }
            )
            
            with self.output_widget:
                print("[DEBUG] Got generator, starting iteration...")
            
            iteration_count = 0
            async for request_output in results_generator:
                iteration_count += 1
                with self.output_widget:
                    print(f"[DEBUG] Iteration {iteration_count}, outputs: {len(request_output.outputs) if request_output.outputs else 0}")
                
                # RequestOutput has outputs list with CompletionOutput objects
                if request_output.outputs:
                    # Get the generated text so far
                    current_text = request_output.outputs[0].text
                    
                    # Print only the new tokens (delta)
                    new_text = current_text[len(full_response):]
                    if new_text:
                        # Display in output widget for Jupyter
                        with self.output_widget:
                            print(new_text, end="", flush=True)
                        full_response = current_text
            
            with self.output_widget:
                print(f"[DEBUG] Loop completed after {iteration_count} iterations")
            
            # New line after completion
            with self.output_widget:
                print()  # New line
                print(f"[Complete] Total tokens: {len(full_response.split())}")
            
            # Cleanup temp file
            os.unlink(tmp_path)
            
        except Exception as e:
            with self.output_widget:
                print(f"\n[ERROR] Error generating response: {e}")
                import traceback
                print("[ERROR] Full traceback:")
                traceback.print_exc()
        finally:
            with self.output_widget:
                print("[DEBUG] Setting generating=False")
            self.generating = False

    def infer_frame(self, frame):
        assert len(frame) == 512
        conf = self.vad(frame, 16000).item()
        self.logs.append(frame)
        self.update_state(conf)
    
    def run(self):
        frame = torch.Tensor([])
        
        while True:
            # Thread-safe buffer access
            with self.buffer_lock:
                if len(self.audio_buffer) == 0:
                    chunk = None
                else:
                    chunk = self.audio_buffer.popleft()
            
            # Sleep if no data to avoid busy-waiting
            if chunk is None:
                time.sleep(0.01)
                continue
            
            # Process any complete frames we already have
            while len(frame) >= 512:
                self.infer_frame(frame[:512])
                frame = frame[512:]
            
            # Add new chunk data to frame
            frame = torch.cat([frame, chunk]) if len(frame) > 0 else chunk
            
            # Process all complete frames from combined data
            while len(frame) >= 512:
                self.infer_frame(frame[:512])
                frame = frame[512:]

agent = VoiceAgent(stream.audio_buffer, stream.buffer_lock, llm, processor)
agent.run()

Using cache found in /root/.cache/torch/hub/snakers4_silero-vad_master


INFO 11-19 17:22:32 [__init__.py:244] Automatically detected platform cuda.


Unrecognized keys in `rope_scaling` for 'rope_type'='default': {'mrope_section', 'mrope_interleaved', 'interleaved'}
Unrecognized keys in `rope_scaling` for 'rope_type'='default': {'mrope_section', 'interleaved'}


INFO 11-19 17:22:41 [config.py:841] This model supports multiple tasks: {'generate', 'classify', 'reward', 'embed'}. Defaulting to 'generate'.


`torch_dtype` is deprecated! Use `dtype` instead!


INFO 11-19 17:22:41 [config.py:1472] Using max model len 16384
INFO 11-19 17:22:43 [config.py:2285] Chunked prefill is enabled with max_num_batched_tokens=5120.
INFO 11-19 17:22:43 [llm_engine.py:230] Initializing a V0 LLM engine (v0.11.1rc7.dev231+g8bd45fc0b.d20251119) with config: model='Qwen/Qwen3-Omni-30B-A3B-Instruct', speculative_config=None, tokenizer='Qwen/Qwen3-Omni-30B-A3B-Instruct', skip_tokenizer_init=False, tokenizer_mode=auto, revision=None, override_neuron_config={}, tokenizer_revision=None, trust_remote_code=True, dtype=torch.float16, max_seq_len=16384, download_dir=None, load_format=LoadFormat.AUTO, tensor_parallel_size=1, pipeline_parallel_size=1, disable_custom_all_reduce=False, quantization=None, enforce_eager=False, kv_cache_dtype=auto,  device_config=cuda, decoding_config=DecodingConfig(backend='xgrammar', disable_fallback=False, disable_any_whitespace=False, disable_additional_properties=False, reasoning_backend=''), observability_config=ObservabilityConfig(show_

You are attempting to use Flash Attention 2 without specifying a torch dtype. This might lead to unexpected behaviour


INFO 11-19 17:22:45 [weight_utils.py:292] Using model weights format ['*.safetensors']


Loading safetensors checkpoint shards:   0% Completed | 0/15 [00:00<?, ?it/s]


INFO 11-19 17:23:20 [default_loader.py:272] Loading weights took 34.85 seconds
INFO 11-19 17:23:21 [model_runner.py:1203] Model loading took 59.1623 GiB and 35.545065 seconds


The image processor of type `Qwen2VLImageProcessor` is now loaded as a fast processor by default, even if the model checkpoint was saved with a slow processor. This is a breaking change and may produce slightly different outputs. To continue using the slow processor, instantiate this class with `use_fast=False`. Note that this behavior will be extended to all models in a future release.




  torch.tensor([1] * torch.tensor(video_grid_thw).shape[0]))


INFO 11-19 17:23:28 [worker.py:294] Memory profiling takes 7.12 seconds
INFO 11-19 17:23:28 [worker.py:294] the current vLLM instance can use total_gpu_memory (79.25GiB) x gpu_memory_utilization (0.95) = 75.29GiB
INFO 11-19 17:23:28 [worker.py:294] model weights take 59.16GiB; non_torch_memory takes 0.09GiB; PyTorch activation peak memory takes 3.91GiB; the rest of the memory reserved for KV Cache is 12.13GiB.
INFO 11-19 17:23:28 [executor_base.py:113] # cuda blocks: 8278, # CPU blocks: 2730
INFO 11-19 17:23:28 [executor_base.py:118] Maximum concurrency for 16384 tokens per request: 8.08x
INFO 11-19 17:23:32 [model_runner.py:1513] Capturing cudagraphs for decoding. This may lead to unexpected consequences if the model is not static. To run the model in eager mode, set 'enforce_eager=True' or use '--enforce-eager' in the CLI. If out-of-memory error occurs during cudagraph capture, consider decreasing `gpu_memory_utilization` or switching to eager mode. You can also reduce the `max_num_s

Capturing CUDA graph shapes:   0%|          | 0/11 [00:00<?, ?it/s]

INFO 11-19 17:23:42 [model_runner.py:1671] Graph capturing finished in 10 secs, took 0.25 GiB
INFO 11-19 17:23:42 [llm_engine.py:428] init engine (profile, create kv cache, warmup model) took 20.95 seconds


Output()

ðŸŽ¤ User speaking - listening...
