In [None]:
pip install fastapi uvicorn numpy scipy soundfile librosa python-multipart


In [None]:
python nabra_app.py


In [None]:


import os
import json
import time
import uuid
import base64
from dataclasses import dataclass
from typing import Dict, Any, Optional, Tuple, List

import numpy as np
import soundfile as sf
import librosa

from fastapi import FastAPI, UploadFile, File, Form, HTTPException
from fastapi.responses import JSONResponse
import uvicorn



APP_NAME = "Nabra Voice Identity PoC"
DB_PATH = "nabra_db.json"
SAMPLE_RATE = 16000


MIN_AUDIO_SECONDS = 1.2
MAX_AUDIO_SECONDS = 12.0

MATCH_THRESHOLD = 0.78  # cosine similarity
SPOOF_RISK_THRESHOLD = 0.65  # 0..1 (أعلى = أخطر)
LIVENESS_THRESHOLD = 0.55    # 0..1 (أعلى = أفضل)

# أوزان Advanced Reliability Scoring (مجموعها 1.0)
W_LIVENESS = 0.30
W_ANTISPOOF = 0.30
W_MATCH = 0.25
W_QUALITY_CONTEXT = 0.15


def _load_db() -> Dict[str, Any]:
    if not os.path.exists(DB_PATH):
        return {"users": {}}
    with open(DB_PATH, "r", encoding="utf-8") as f:
        return json.load(f)

def _save_db(db: Dict[str, Any]) -> None:
    with open(DB_PATH, "w", encoding="utf-8") as f:
        json.dump(db, f, ensure_ascii=False, indent=2)

def _now_ts() -> int:
    return int(time.time())


# -----------------------------
# معالجة الصوت + Features (Multi-layer)
# -----------------------------

class AudioInfo:
    y: np.ndarray
    sr: int
    duration: float

def load_audio(file_bytes: bytes) -> AudioInfo:
    """قراءة WAV/FLAC/OGG… (حسب دعم soundfile)."""
    try:
        import io
        with io.BytesIO(file_bytes) as bio:
            y, sr = sf.read(bio, dtype="float32", always_2d=False)
    except Exception as e:
        raise HTTPException(status_code=400, detail=f"تعذر قراءة ملف الصوت: {e}")

    if y.ndim > 1:
        y = np.mean(y, axis=1)  # mono

    # Resample
    if sr != SAMPLE_RATE:
        y = librosa.resample(y, orig_sr=sr, target_sr=SAMPLE_RATE)
        sr = SAMPLE_RATE

    # Normalize
    if np.max(np.abs(y)) > 0:
        y = y / (np.max(np.abs(y)) + 1e-8)

    duration = len(y) / sr
    return AudioInfo(y=y, sr=sr, duration=duration)

def basic_quality_metrics(y: np.ndarray, sr: int) -> Dict[str, float]:
    """
    قياسات جودة مبسطة:
    - SNR تقريبي
    """
    frame_len = int(0.03 * sr)
    hop = int(0.015 * sr)
    if frame_len <= 0 or hop <= 0 or len(y) < frame_len:
        return {"energy": 0.0, "silence_ratio": 1.0, "snr_proxy": 0.0}

    frames = librosa.util.frame(y, frame_length=frame_len, hop_length=hop)
    rms = np.sqrt(np.mean(frames**2, axis=0) + 1e-10)
    energy = float(np.mean(rms))

    silence_ratio = float(np.mean(rms < (0.02)))  # threshold بسيط

    # snr proxy: الفرق بين متوسط أعلى 20% وأقل 20%
    rms_sorted = np.sort(rms)
    n = len(rms_sorted)
    low = np.mean(rms_sorted[: max(1, n // 5)])
    high = np.mean(rms_sorted[-max(1, n // 5):])
    snr_proxy = float((high - low) / (low + 1e-6))

    return {"energy": energy, "silence_ratio": silence_ratio, "snr_proxy": snr_proxy}

def extract_multilayer_voiceprint(y: np.ndarray, sr: int) -> Dict[str, Any]:
    """
    Multi-Layer Voiceprint Features

    - Deep embedding proxy: تجميع متجه ميزات ( ECAPA )
    """
    # MFCC
    mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=20)
    mfcc_mean = mfcc.mean(axis=1)
    mfcc_std = mfcc.std(axis=1)

    # Spectral
    centroid = librosa.feature.spectral_centroid(y=y, sr=sr).mean()
    flatness = librosa.feature.spectral_flatness(y=y).mean()

    # Pitch (f0) — proxy
    try:
        f0, voiced_flag, voiced_probs = librosa.pyin(
            y, fmin=librosa.note_to_hz("C2"), fmax=librosa.note_to_hz("C7")
        )
        f0 = np.nan_to_num(f0, nan=0.0)
        voiced_ratio = float(np.mean(voiced_flag)) if voiced_flag is not None else 0.0
        f0_mean = float(np.mean(f0[f0 > 0])) if np.any(f0 > 0) else 0.0
        f0_std = float(np.std(f0[f0 > 0])) if np.any(f0 > 0) else 0.0
    except Exception:
        voiced_ratio, f0_mean, f0_std = 0.0, 0.0, 0.0

    # Temporal proxies: pauses
    q = basic_quality_metrics(y, sr)
    pause_proxy = q["silence_ratio"]

    # Build embedding vector (Deep embedding proxy)
    embedding = np.concatenate([
        mfcc_mean, mfcc_std,
        np.array([centroid, flatness, voiced_ratio, f0_mean, f0_std, pause_proxy], dtype=np.float32)
    ]).astype(np.float32)

    # Normalize embedding
    norm = np.linalg.norm(embedding) + 1e-8
    embedding = embedding / norm

    return {
        "mfcc_mean": mfcc_mean.tolist(),
        "mfcc_std": mfcc_std.tolist(),
        "spectral_centroid": float(centroid),
        "spectral_flatness": float(flatness),
        "voiced_ratio": float(voiced_ratio),
        "f0_mean": float(f0_mean),
        "f0_std": float(f0_std),
        "pause_proxy": float(pause_proxy),
        "embedding": embedding.tolist(),
        "quality": q,
    }

def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
    a = a.astype(np.float32)
    b = b.astype(np.float32)
    return float(np.dot(a, b) / ((np.linalg.norm(a) + 1e-8) * (np.linalg.norm(b) + 1e-8)))


# -----------------------------
# Anti-Spoofing
# -----------------------------
def anti_spoof_score(y: np.ndarray, sr: int, feats: Dict[str, Any]) -> float:
    """
    0..1 (أعلى = خطر أكبر)
    PoC heuristics:
    - flatness عالي جدًا قد يشير لتوليد/ضغط قوي
    - voiced_ratio منخفض جدًا مع صوت واضح -> شذوذ
    - snr_proxy منخفض + طاقة منخفضة -> تسجيل سيء (يُرفع الخطر)
    """
    flatness = feats["spectral_flatness"]
    voiced_ratio = feats["voiced_ratio"]
    q = feats["quality"]

    risk = 0.0

    risk += np.clip((flatness - 0.15) / 0.35, 0, 1) * 0.45

    if q["energy"] > 0.03:
        risk += np.clip((0.35 - voiced_ratio) / 0.35, 0, 1) * 0.25

    # quality weak
    risk += np.clip((0.6 - q["snr_proxy"]) / 0.6, 0, 1) * 0.20
    risk += np.clip((0.02 - q["energy"]) / 0.02, 0, 1) * 0.10

    return float(np.clip(risk, 0.0, 1.0))


# -----------------------------
# Liveness + Active Challenge
# -----------------------------
def liveness_score(y: np.ndarray, sr: int, expected_text: str, provided_text: str, feats: Dict[str, Any]) -> float:
    """
    0..1 (أعلى = أفضل)
    - Active challenge: مطابقة نص التحدي (في الهاكثون ممكن المستخدم يدخل النص يدويًا)
    - Behavioral: pause_proxy أقل (كلام طبيعي) + voiced_ratio أعلى
    """
    # Active challenge (نسبة تشابه بسيطة)
    exp = (expected_text or "").strip()
    got = (provided_text or "").strip()

    # بسيط: تطابق حرفي/تقريبي
    if not exp:
        challenge_match = 0.5
    else:
        challenge_match = 1.0 if exp == got else 0.0

    voiced_ratio = feats["voiced_ratio"]
    pause = feats["pause_proxy"]

    behavior = 0.0
    behavior += np.clip((voiced_ratio - 0.25) / 0.55, 0, 1) * 0.6
    behavior += np.clip((0.55 - pause) / 0.55, 0, 1) * 0.4

    score = 0.65 * challenge_match + 0.35 * behavior
    return float(np.clip(score, 0.0, 1.0))


# -----------------------------
# Advanced Reliability Scoring
# -----------------------------
def reliability_score(
    match_sim: float,
    liveness: float,
    spoof_risk: float,
    quality: Dict[str, float],
    context_risk: float = 0.0
) -> Tuple[float, str]:
    """
    يحسب Reliability 0..100
    - match_sim: 0..1 ( أفضل)
    - liveness: 0..1 ( أفضل)
    - spoof_risk: 0..1 ( أخطر) => نحوله إلى anti_spoof = 1 - risk
    - quality/context: نضيف عامل بسيط
    """
    anti_spoof = 1.0 - spoof_risk

    # Quality score بسيط
    q_energy = quality.get("energy", 0.0)
    q_snr = quality.get("snr_proxy", 0.0)
    q_silence = quality.get("silence_ratio", 1.0)

    quality_score = 0.0
    quality_score += np.clip((q_energy - 0.015) / 0.05, 0, 1) * 0.45
    quality_score += np.clip(q_snr / 2.0, 0, 1) * 0.35
    quality_score += np.clip((0.65 - q_silence) / 0.65, 0, 1) * 0.20

    # Context score: (0 = no risk) -> invert
    context_score = float(np.clip(1.0 - context_risk, 0.0, 1.0))

    qc = 0.7 * quality_score + 0.3 * context_score

    rel_0_1 = (
        W_LIVENESS * liveness +
        W_ANTISPOOF * anti_spoof +
        W_MATCH * match_sim +
        W_QUALITY_CONTEXT * qc
    )
    rel_0_100 = float(np.clip(rel_0_1, 0, 1) * 100.0)

    if rel_0_100 >= 85:
        decision = "ACCEPT"
    elif rel_0_100 >= 60:
        decision = "STEP_UP"  # اطلب تحقق إضافي (OTP/إعادة تسجيل/سؤال)
    else:
        decision = "REJECT"

    return rel_0_100, decision


# -----------------------------
# FastAPI App
# -----------------------------
app = FastAPI(title=APP_NAME, version="1.0.0")


@app.get("/")
def home():
    return {"name": APP_NAME, "status": "running", "docs": "/docs"}


@app.post("/challenge/new")
def new_challenge(user_id: str = Form(...)):
    """
    يولد تحدي نشط (Active Challenge) بسيط.
    في الهاكثون: اعرضه للمستخدم ليقرأه.
    """
    # تحدي أرقام عربي بسيط (تقدرين تغييره)
    digits = np.random.choice(list("0123456789"), size=4, replace=True)
    phrase = "اقرأ الأرقام التالية: " + " ".join(digits.tolist())

    db = _load_db()
    if user_id not in db["users"]:
        # يمكن السماح حتى لو لم يُسجل
        db["users"][user_id] = {"enrollments": [], "meta": {"created_at": _now_ts()}}

    chal_id = str(uuid.uuid4())
    db["users"][user_id].setdefault("challenges", {})
    db["users"][user_id]["challenges"][chal_id] = {
        "text": phrase,
        "created_at": _now_ts(),
        "expires_in_sec": 180
    }
    _save_db(db)

    return {"user_id": user_id, "challenge_id": chal_id, "challenge_text": phrase, "expires_in_sec": 180}


@app.post("/enroll")
async def enroll(
    user_id: str = Form(...),
    audio: UploadFile = File(...),
    note: str = Form(""),
):
    """
    Enrollment: ينشئ بصمة صوتية ويخزن embedding
    """
    file_bytes = await audio.read()
    info = load_audio(file_bytes)

    if info.duration < MIN_AUDIO_SECONDS:
        raise HTTPException(status_code=400, detail=f"الصوت قصير جدًا ({info.duration:.2f}s).")
    if info.duration > MAX_AUDIO_SECONDS:
        raise HTTPException(status_code=400, detail=f"الصوت طويل جدًا ({info.duration:.2f}s).")

    feats = extract_multilayer_voiceprint(info.y, info.sr)

    db = _load_db()
    db["users"].setdefault(user_id, {"enrollments": [], "meta": {"created_at": _now_ts()}})

    enr_id = str(uuid.uuid4())
    db["users"][user_id]["enrollments"].append({
        "enrollment_id": enr_id,
        "created_at": _now_ts(),
        "note": note,
        "embedding": feats["embedding"],
        "feature_meta": {
            "spectral_centroid": feats["spectral_centroid"],
            "spectral_flatness": feats["spectral_flatness"],
            "voiced_ratio": feats["voiced_ratio"],
        }
    })
    _save_db(db)

    return {
        "user_id": user_id,
        "enrollment_id": enr_id,
        "duration_sec": info.duration,
        "quality": feats["quality"],
        "message": "تم إنشاء بصمة صوتية (Enrollment) بنجاح."
    }


@app.post("/verify")
async def verify(
    user_id: str = Form(...),
    challenge_id: str = Form(...),
    provided_text: str = Form(""),
    audio: UploadFile = File(...),
):
    """
    Verification:
    1) Multi-layer features
    2) Anti-spoof risk
    3) Liveness (Active challenge match + behavior)
    4) Voiceprint matching (cosine similarity)
    5) Advanced reliability scoring + decision
    """
    db = _load_db()
    user = db["users"].get(user_id)
    if not user:
        raise HTTPException(status_code=404, detail="المستخدم غير موجود. نفّذ enroll أولًا.")

    enrollments = user.get("enrollments", [])
    if not enrollments:
        raise HTTPException(status_code=400, detail="لا توجد بصمات صوتية للمستخدم. نفّذ enroll أولًا.")

    challenges = user.get("challenges", {})
    chal = challenges.get(challenge_id)
    if not chal:
        raise HTTPException(status_code=404, detail="Challenge غير موجود.")

    # تحقق صلاحية التحدي
    age = _now_ts() - int(chal.get("created_at", 0))
    if age > int(chal.get("expires_in_sec", 180)):
        raise HTTPException(status_code=400, detail="انتهت صلاحية التحدي. أنشئ تحدي جديد.")

    expected_text = chal["text"]

    file_bytes = await audio.read()
    info = load_audio(file_bytes)

    if info.duration < MIN_AUDIO_SECONDS:
        raise HTTPException(status_code=400, detail=f"الصوت قصير جدًا ({info.duration:.2f}s).")
    if info.duration > MAX_AUDIO_SECONDS:
        raise HTTPException(status_code=400, detail=f"الصوت طويل جدًا ({info.duration:.2f}s).")

    feats = extract_multilayer_voiceprint(info.y, info.sr)

    # Anti-spoof risk (أعلى=أخطر)
    spoof_risk = anti_spoof_score(info.y, info.sr, feats)

    # Liveness
    live = liveness_score(info.y, info.sr, expected_text, provided_text, feats)

    # Match against best enrollment
    emb = np.array(feats["embedding"], dtype=np.float32)

    sims: List[Tuple[str, float]] = []
    for e in enrollments:
        e_emb = np.array(e["embedding"], dtype=np.float32)
        sims.append((e["enrollment_id"], cosine_similarity(emb, e_emb)))

    best_enr_id, best_sim = sorted(sims, key=lambda x: x[1], reverse=True)[0]

    # Context risk placeholder /محاولات كثيرة)
    context_risk = 0.0

    rel, decision = reliability_score(
        match_sim=best_sim,
        liveness=live,
        spoof_risk=spoof_risk,
        quality=feats["quality"],
        context_risk=context_risk,
    )


    hard_reject_reasons = []
    if spoof_risk >= SPOOF_RISK_THRESHOLD:
        hard_reject_reasons.append("اشتباه انتحال/صوت مزيف (Anti-Spoofing).")
    if live < LIVENESS_THRESHOLD:
        hard_reject_reasons.append("فشل كشف الحيوية أو التحدي النشط (Liveness/Active Challenge).")

    # لو فشل hard gates -> رفض حتى لو Reliability متوسط
    if hard_reject_reasons:
        decision = "REJECT"

    # تفسير مبسط (Explainable)
    explanation = {
        "best_match_enrollment_id": best_enr_id,
        "match_similarity": best_sim,
        "liveness_score": live,
        "spoof_risk": spoof_risk,
        "quality": feats["quality"],
        "reliability_score": rel,
        "hard_reject_reasons": hard_reject_reasons,
        "expected_challenge_text": expected_text,
        "provided_text": provided_text,
    }

    return {
        "user_id": user_id,
        "decision": decision,  # ACCEPT / STEP_UP / REJECT
        "explanation": explanation
    }


@app.get("/users/{user_id}")
def get_user(user_id: str):
    db = _load_db()
    user = db["users"].get(user_id)
    if not user:
        raise HTTPException(status_code=404, detail="المستخدم غير موجود.")
    # لا نعرض embeddings بالكامل هنا (اختصار)
    return {
        "user_id": user_id,
        "enrollments_count": len(user.get("enrollments", [])),
        "has_challenges": bool(user.get("challenges")),
        "meta": user.get("meta", {})
    }


if __name__ == "__main__":
    uvicorn.run("nabra_app:app", host="127.0.0.1", port=8000, reload=False)
