# API vs Direct Chat Manual Inspection Notebook
This notebook provides two helper functions:
1. **api_chat** - calls the running BeautyAI REST API.
2. **direct_chat** - invokes the internal ChatService directly.

You can: 
- Ask the *same* question with identical parameters.
- Inspect whether responses match.
- Compare output structure / format.
- Check whether the same ChatService instance ID persists (printed).

No regex cleaning, no auto comparison; just raw outputs for your manual review.

In [16]:
import sys, json, logging, requests
from pathlib import Path
from typing import Any, Dict

# Silence noisy loggers
logging.basicConfig(level=logging.CRITICAL)
for noisy in ["uvicorn", "uvicorn.error", "uvicorn.access", "httpx", "transformers", "torch"]:
    logging.getLogger(noisy).setLevel(logging.CRITICAL)

# Add backend source
backend_src = Path('..').resolve().parent / 'src'
sys.path.insert(0, str(backend_src))

from beautyai_inference.services.inference.chat_service import ChatService  # type: ignore
from beautyai_inference.config.config_manager import AppConfig  # type: ignore
from beautyai_inference.services.model.registry_service import ModelRegistryService  # type: ignore

API_BASE_URL = 'http://127.0.0.1:8000'
_CHAT_SERVICE = ChatService()
_APP_CONFIG = AppConfig()
_MODEL_REGISTRY_SERVICE = ModelRegistryService()

def api_chat(model_name: str, message: str, **params) -> Dict[str, Any]:
    payload = {'model_name': model_name, 'message': message} | params
    r = requests.post(f'{API_BASE_URL}/inference/chat', json=payload, timeout=60)
    try: return r.json()
    except Exception: return {'error': True, 'status_code': r.status_code, 'text': r.text} 

def direct_chat(model_name: str, message: str, **params) -> Dict[str, Any]:
    if not getattr(_APP_CONFIG, 'model_registry', None): _APP_CONFIG.load_model_registry()
    model_cfg = _MODEL_REGISTRY_SERVICE.get_model(_APP_CONFIG, model_name)
    if model_cfg is None: return {'error': f'model {model_name} not in registry'}
    gen_cfg = params.copy()
    for k in ['disable_content_filter','enable_thinking']: gen_cfg.pop(k, None)
    response, detected_language, _, session_id = _CHAT_SERVICE.chat(
        message=message, model_name=model_name, model_config=model_cfg,
        generation_config=gen_cfg, conversation_history=[], response_language='auto',
        session_id=None, disable_content_filter=params.get('disable_content_filter', False)
    )
    return {
        'final_content': response,
        'detected_language': detected_language,
        'session_id': session_id,
        'chat_service_id': id(_CHAT_SERVICE),
        'model_name': model_name,
    }
print('Functions ready: api_chat, direct_chat')

Functions ready: api_chat, direct_chat


In [17]:
# Example usage — adjust model, message, and params as needed.
model_name = 'qwen3-unsloth-q4ks'  # change if needed
message = '/no_think What is botox?'
params = {
    'temperature': 0.0,
    'top_p': 0.95,
    'max_new_tokens': 200,
    'do_sample': False,
    'disable_content_filter': True,
    'enable_thinking': False,
} 

api_result = api_chat(model_name, message, **params)
direct_result = direct_chat(model_name, message, **params)
print('=== API RESULT RAW JSON ===')
print(json.dumps(api_result, ensure_ascii=False, indent=2))
print('=== DIRECT RESULT RAW JSON ===')
print(json.dumps(direct_result, ensure_ascii=False, indent=2))

=== API RESULT RAW JSON ===
{
  "success": true,
  "data": null,
  "timestamp": "2025-08-22T02:24:52.188558Z",
  "execution_time_ms": 127.55966186523438,
  "response": "…",
  "session_id": "default",
  "model_name": "qwen3-unsloth-q4ks",
  "generation_stats": {
    "model_info": {},
    "generation_config_used": {
      "temperature": 0.0,
      "top_p": 0.95,
      "top_k": 20,
      "repetition_penalty": 1.05,
      "max_new_tokens": 200,
      "do_sample": false,
      "enable_thinking": false
    },
    "content_filter_config": {
      "strictness_level": "disabled"
    },
    "performance": {
      "total_time_ms": 127.55966186523438,
      "generation_time_ms": 125.53858757019043,
      "tokens_generated": 1,
      "tokens_per_second": 7.96567827753268,
      "thinking_tokens": 0
    }
  },
  "effective_config": {
    "temperature": 0.0,
    "top_p": 0.95,
    "top_k": 20,
    "repetition_penalty": 1.05,
    "max_new_tokens": 200,
    "do_sample": false,
    "enable_thinking": fa

In [18]:
# Simple test to debug the model response generation
import requests

# Test just the API with a simple question
test_payload = {
    "model_name": "qwen3-unsloth-q4ks",
    "message": "ما هي الفائدة الرئيسية لعلاج الضوء النبدي المكثف؟",
    "temperature": 0.7,
    "max_new_tokens": 150,
    "disable_content_filter": True,
}

print("Testing API with Arabic question...")
response = requests.post('http://127.0.0.1:8000/inference/chat', json=test_payload, timeout=60)
print(f"Status Code: {response.status_code}")

if response.status_code == 200:
    result = response.json()
    print(f"Response: {result.get('response', 'No response field')[:500]}")
    print(f"Language: {result.get('language', 'No language field')}")
else:
    print(f"Error: {response.text}")

Testing API with Arabic question...


Status Code: 200
Response: <think>
Okay, the user is asking about the main benefit of Intense Pulsed Light (IPL) treatment. Let me recall what IPL is used for. It's a non-invasive procedure that uses broad-spectrum light to target various skin issues.
First, I should mention the primary uses: things like sun damage, age spots, freckles, and uneven skin tone. Also, it can help with vascular issues like spider veins and rosacea. Maybe also mention hair removal as another application.
But the user specifically asked for the 
Language: No language field


In [13]:
# Test the /no_think instruction specifically
test_payload_no_think = {
    "model_name": "qwen3-unsloth-q4ks",
    "message": "ما هي الفائدة الرئيسية لعلاج الضوء النبدي المكثف؟ /no_think",
    "temperature": 0.7,
    "max_new_tokens": 150,
    "disable_content_filter": True,
}

print("Testing with /no_think instruction...")
response = requests.post('http://127.0.0.1:8000/inference/chat', json=test_payload_no_think, timeout=60)
print(f"Status Code: {response.status_code}")

if response.status_code == 200:
    result = response.json()
    print(f"Response: {result.get('response', 'No response field')}")
    print(f"Language: {result.get('language', 'No language field')}")
    print(f"Response Length: {len(result.get('response', ''))}")
else:
    print(f"Error: {response.text}")

Testing with /no_think instruction...
Status Code: 200
Response: <think>
Okay, the user is asking about the main benefit of Intense Pulsed Light (IPL) treatment. Let me recall what IPL is used for. It's a non-invasive procedure that uses broad-spectrum light to target various skin issues.
First, I should mention the primary uses: things like sun damage, pigmentation, and vascular lesions. The main benefit here is improving skin texture and appearance by reducing these issues. Also, IPL can help with hair removal, but the user might be more interested in skin rejuvenation.
I need to make sure the answer is clear and concise, in Arabic only. Avoid medical jargon so it's easy for patients to understand. Highlight the key points: treating pigmentation, sunspots
Language: No language field
Response Length: 702


In [19]:
# Test the fixed API with enable_thinking=False explicitly
test_payload_fixed = {
    "model_name": "qwen3-unsloth-q4ks",
    "message": "ما هي الفائدة الرئيسية لعلاج الضوء النبدي المكثف؟",
    "temperature": 0.3,
    "top_p": 0.95,
    "max_new_tokens": 150,
    "disable_content_filter": True,
    "enable_thinking": False,  # Explicitly disable thinking mode
}

print("Testing with enable_thinking=False...")
response = requests.post('http://127.0.0.1:8000/inference/chat', json=test_payload_fixed, timeout=60)
print(f"Status Code: {response.status_code}")

if response.status_code == 200:
    result = response.json()
    print(f"Response: {result.get('response', 'No response field')}")
    print(f"Language: {result.get('language', 'No language field')}")
    print(f"Response Length: {len(result.get('response', ''))}")
    print(f"Thinking Enabled: {result.get('thinking_enabled', 'N/A')}")
else:
    print(f"Error: {response.text}")

Testing with enable_thinking=False...


Status Code: 200
Response: العلاج بالضوء النبدي المكثف (IPL) يُستخدم بشكل رئيسي لتحسين مظهر البشرة من خلال تقليل البقع الداكنة، التصبغات، والتهابات البشرة. كما يساعد في تقليل ظهور الشعر الزائد وتحسين نضارة البشرة.
Language: No language field
Response Length: 186
Thinking Enabled: False


In [15]:
# Check recent API logs for debug messages about enable_thinking
import subprocess

print("Checking recent API logs for enable_thinking debug messages...")
try:
    result = subprocess.run([
        "sudo", "journalctl", "-u", "beautyai-api.service", 
        "--since", "2 minutes ago", "--no-pager"
    ], capture_output=True, text=True, timeout=10)
    
    lines = result.stdout.split('\n')
    relevant_lines = [line for line in lines if any(keyword in line.lower() for keyword in [
        'enable_thinking', 'chat template', 'fallback', 'debug'
    ])]
    
    for line in relevant_lines[-10:]:  # Show last 10 relevant lines
        print(line)
        
except Exception as e:
    print(f"Error checking logs: {e}")

Checking recent API logs for enable_thinking debug messages...


In [21]:
# Raw Streaming Debug (no UI) - Test local streaming endpoint 
import asyncio, math, json, time, contextlib, os, ssl
from pathlib import Path

try:
    import websockets  # type: ignore
except ImportError:
    raise RuntimeError("Install websockets: pip install websockets")

# Local server configuration (confirmed working)
HOST = '127.0.0.1:8000'
USE_SECURE = False  # Local server uses HTTP/WS
LANGUAGE = 'ar'  # 'ar' | 'en' | 'auto'
PCM_PATH = Path('/home/lumi/beautyai/voice_tests/input_test_questions/pcm/q10.pcm')
FRAME_MS = 20
FAST = True  # if False, will pace real-time
TAIL_SILENCE_MS = 800
AUTO_CLOSE_AFTER_FINAL_S = 15.0  # Increased timeout

print(f"Config -> host={HOST} secure={USE_SECURE} language={LANGUAGE} file={PCM_PATH}")
assert PCM_PATH.exists(), f"PCM file not found: {PCM_PATH}"

# First, verify streaming endpoint is available
import requests
try:
    stream_status = requests.get(f"http://{HOST}/api/v1/ws/streaming-voice/status", timeout=5)
    print(f"✅ Streaming voice status: {stream_status.status_code}")
    if stream_status.status_code == 200:
        status_data = stream_status.json()
        print(f"   Enabled: {status_data.get('enabled')}, Active sessions: {status_data.get('active_sessions')}")
    else:
        print(f"❌ Stream status error: {stream_status.text}")
        
except Exception as e:
    print(f"❌ Local server connection failed: {e}")

async def stream_debug():
    pcm_bytes = PCM_PATH.read_bytes()
    samples_per_frame = int(16000 * FRAME_MS / 1000)
    frame_bytes = samples_per_frame * 2
    total_frames = math.ceil(len(pcm_bytes)/frame_bytes)
    scheme = 'wss' if USE_SECURE else 'ws'
    url = f"{scheme}://{HOST}/api/v1/ws/streaming-voice?language={LANGUAGE}"
    print(f"\nConnecting to {url}")
    print(f"Audio: {total_frames} frames, {len(pcm_bytes)} bytes")

    events = []
    final_transcript = None
    tts_complete = False
    first_event_time = None
    start = time.time()

    async def sender(ws):
        print("📤 [sender] Starting to send audio frames...")
        cursor = 0
        frame_index = 0
        
        while cursor < len(pcm_bytes):
            chunk = pcm_bytes[cursor: cursor+frame_bytes]
            cursor += frame_bytes
            frame_index += 1
            
            try:
                await ws.send(chunk)
                if frame_index % 100 == 0:  # Progress every 100 frames
                    print(f"   Sent frame {frame_index}/{total_frames}")
            except Exception as e:
                print(f"❌ [sender] Send error at frame {frame_index}: {e}")
                return
                
            if not FAST:
                await asyncio.sleep(FRAME_MS/1000)
        
        # Send trailing silence
        print("🔇 [sender] Sending trailing silence...")
        silence_frames = max(1, int(TAIL_SILENCE_MS/FRAME_MS))
        silence_chunk = b"\x00\x00" * samples_per_frame
        
        for i in range(silence_frames):
            try:
                await ws.send(silence_chunk)
            except Exception as e:
                print(f"❌ [sender] Trailing silence send error: {e}")
                break
            if not FAST:
                await asyncio.sleep(FRAME_MS/1000)
                
        print("✅ [sender] Finished sending all audio data")

    async def receiver(ws, final_event):
        nonlocal final_transcript, tts_complete, first_event_time
        message_count = 0
        
        try:
            while True:
                try:
                    msg = await asyncio.wait_for(ws.recv(), timeout=2.0)
                    message_count += 1
                except asyncio.TimeoutError:
                    print("⏰ [receiver] No message received for 2 seconds, continuing...")
                    continue
                except Exception as e:
                    print(f"🔚 [receiver] Recv ended after {message_count} messages: {e}")
                    break
                
                now = time.time()
                if first_event_time is None:
                    first_event_time = now
                    
                try:
                    data = json.loads(msg)
                except json.JSONDecodeError:
                    print(f"📄 [receiver] Non-JSON message #{message_count}, length={len(msg)}")
                    continue
                    
                events.append(data)
                etype = data.get('type')
                content = data.get('text', data.get('content', ''))[:100]  # Truncate long content
                print(f"📨 [event #{message_count}] {etype}: {content}")
                
                if etype == 'final_transcript' and final_transcript is None:
                    final_transcript = data.get('text')
                    print(f"🎤 FINAL TRANSCRIPT: {final_transcript}")
                    
                if etype == 'tts_complete':
                    tts_complete = True
                    print("🔊 TTS COMPLETE")
                    
                if final_transcript and tts_complete:
                    print("✅ Both transcript and TTS complete!")
                    final_event.set()
                    break
                    
        except Exception as e:
            print(f"❌ [receiver] Error: {e}")

    try:
        async with websockets.connect(url, ping_interval=30) as ws:
            print("🔗 WebSocket connected successfully!")
            
            final_event = asyncio.Event()
            
            # Start both sender and receiver
            sender_task = asyncio.create_task(sender(ws))
            receiver_task = asyncio.create_task(receiver(ws, final_event))
            
            # Wait for sender to complete
            await sender_task
            print("📤 Sender finished, waiting for processing...")
            
            # Wait for final event or timeout
            try:
                await asyncio.wait_for(final_event.wait(), timeout=AUTO_CLOSE_AFTER_FINAL_S)
                print("✅ Stream completed successfully!")
            except asyncio.TimeoutError:
                print(f"⏰ Timeout waiting for completion ({AUTO_CLOSE_AFTER_FINAL_S}s)")
            
            # Cancel receiver task
            receiver_task.cancel()
            
            try:
                await receiver_task
            except asyncio.CancelledError:
                pass
                
    except Exception as e:
        print(f"❌ WebSocket connection error: {e}")

    end = time.time()
    print("\n=== STREAMING DEBUG SUMMARY ===")
    summary = {
        'endpoint': f"{scheme}://{HOST}",
        'file': str(PCM_PATH),
        'bytes': len(pcm_bytes),
        'frames': total_frames,
        'language': LANGUAGE,
        'final_transcript': final_transcript,
        'tts_complete': tts_complete,
        'events_total': len(events),
        'first_event_latency_ms': int((first_event_time-start)*1000) if first_event_time else None,
        'duration_s': round(end-start, 3),
    }
    print(json.dumps(summary, ensure_ascii=False, indent=2))
    
    return summary

# Run the stream debug
result = await stream_debug()
print(f"\n🏁 Test completed. Success: {result['final_transcript'] is not None}")

Config -> host=127.0.0.1:8000 secure=False language=ar file=/home/lumi/beautyai/voice_tests/input_test_questions/pcm/q10.pcm
✅ Streaming voice status: 200
   Enabled: True, Active sessions: 1

Connecting to ws://127.0.0.1:8000/api/v1/ws/streaming-voice?language=ar
Audio: 269 frames, 171870 bytes
🔗 WebSocket connected successfully!
📤 [sender] Starting to send audio frames...
   Sent frame 100/269
   Sent frame 200/269
🔇 [sender] Sending trailing silence...
✅ [sender] Finished sending all audio data
📤 Sender finished, waiting for processing...
📨 [event #1] ready: 
📨 [event #2] decoder_started: 
📨 [event #3] ingest_mode: 


📨 [event #4] partial_transcript: ما هي الفائدة الرئيسية لعلاج الضوء النبدي المكثف؟
📨 [event #5] metrics_snapshot: 
📨 [event #6] perf_cycle: 
📨 [event #7] perf_cycle: 
📨 [event #8] endpoint_event: 
📨 [event #9] perf_cycle: 
📨 [event #10] perf_cycle: 
📨 [event #11] endpoint_event: 
📨 [event #12] final_transcript: ما هي الفائدة الرئيسية لعلاج الضوء النبدي المكثف؟
🎤 FINAL TRANSCRIPT: ما هي الفائدة الرئيسية لعلاج الضوء النبدي المكثف؟
📨 [event #13] perf_cycle: 
📨 [event #14] assistant_pipeline_start: 
📨 [event #15] tts_start: 
📨 [event #16] endpoint_event: 
📨 [event #17] endpoint_event: 
📨 [event #18] heartbeat: 
📨 [event #19] endpoint_event: 
📨 [event #20] endpoint_event: 
📨 [event #21] endpoint_event: 
📨 [event #22] endpoint_event: 
📨 [event #23] assistant_response: الفائدة الرئيسية لعلاج الضوء النبدي المكثف (IPL) هي تقليل البقع الداكنة والتصبغات على البشرة، وتحسين
📨 [event #24] endpoint_event: 
📨 [event #25] heartbeat: 
📨 [event #26] endpoint_event: 
📨 [event #27] endpoint_event: 
📨 [even

In [None]:
# Test remote API availability and streaming voice status
import requests
import ssl
import urllib3

# Disable SSL warnings for self-signed certificates
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

def test_remote_api():
    base_url = "https://api.gmai.sa"
    
    # Test basic API health
    try:
        response = requests.get(f"{base_url}/health", verify=False, timeout=10)
        print(f"API Health Status: {response.status_code}")
        if response.status_code == 200:
            print(f"API Health Response: {response.json()}")
    except Exception as e:
        print(f"API Health Check Failed: {e}")
    
    # Test streaming voice status endpoint
    try:
        response = requests.get(f"{base_url}/api/v1/ws/streaming-voice/status", verify=False, timeout=10)
        print(f"Streaming Voice Status: {response.status_code}")
        if response.status_code == 200:
            print(f"Streaming Voice Response: {response.json()}")
        else:
            print(f"Error Response: {response.text}")
    except Exception as e:
        print(f"Streaming Voice Status Check Failed: {e}")
        
    # Test basic inference endpoint
    try:
        test_payload = {
            "model_name": "qwen3-unsloth-q4ks",
            "message": "Hello test",
            "disable_content_filter": True,
            "max_new_tokens": 10
        }
        response = requests.post(f"{base_url}/inference/chat", json=test_payload, verify=False, timeout=30)
        print(f"Inference Test Status: {response.status_code}")
        if response.status_code == 200:
            result = response.json()
            print(f"Inference Test Success - Response length: {len(result.get('response', ''))}")
        else:
            print(f"Inference Error: {response.text[:200]}")
    except Exception as e:
        print(f"Inference Test Failed: {e}")

test_remote_api()

In [None]:
# Test remote WSS endpoint that user confirmed working in browser
# This is just to verify the connection format is correct
import asyncio
import ssl
import json

try:
    import websockets  # type: ignore
except ImportError:
    raise RuntimeError("Install websockets: pip install websockets")

async def test_remote_wss_connection():
    """Test the remote WSS endpoint format that user confirmed works in browser"""
    url = "wss://api.gmai.sa/api/v1/ws/streaming-voice?language=ar"
    print(f"Testing remote WSS connection: {url}")
    
    # Create SSL context for self-signed certificate
    ssl_context = ssl.create_default_context()
    ssl_context.check_hostname = False
    ssl_context.verify_mode = ssl.CERT_NONE
    
    try:
        # Try to connect to verify the endpoint format
        async with websockets.connect(url, ssl=ssl_context, ping_interval=30) as ws:
            print("✅ Remote WSS connection successful!")
            print("🎧 Connection headers confirm WebSocket upgrade worked")
            
            # Send a small test to see if we get a response
            await asyncio.sleep(1)  # Give server a moment
            
            # Check if any initial messages
            try:
                msg = await asyncio.wait_for(ws.recv(), timeout=3.0)
                data = json.loads(msg)
                print(f"📨 Received initial message: {data}")
            except asyncio.TimeoutError:
                print("⏰ No initial message (normal for streaming endpoint)")
            except Exception as e:
                print(f"📄 Initial message error: {e}")
                
            print("🔚 Closing test connection")
            
    except Exception as e:
        print(f"❌ Remote WSS connection failed: {e}")
        print("   This might be due to network restrictions or server configuration")

# Test the remote endpoint format
await test_remote_wss_connection()