In [14]:
MODEL_DIR = "./whisper-drone-command-final"
SAMPLE_RATE = 16000
SECONDS = 5
FORCE_ENGLISH = True
DEFAULT_DISTANCE = 10.0 

In [None]:
import time
import json
import re
from difflib import get_close_matches

import numpy as np
import torch
import librosa
import sounddevice as sd
from scipy.io.wavfile import write as wav_write

import nltk
# nltk.download("punkt", quiet=True)
# nltk.download("punkt_tab", quiet=True)
from nltk.tokenize import sent_tokenize

import spacy
from transformers import WhisperForConditionalGeneration, WhisperProcessor


In [17]:
def record_audio(seconds = SECONDS, fs = SAMPLE_RATE):
    print(f"Recording {seconds}s at {fs} Hz mono...")
    rec = sd.rec(int(seconds * fs), samplerate=fs, channels=1, dtype="float32")
    sd.wait()
    print("Done.")
    return rec.flatten()


def save_wav(path, audio, fs = SAMPLE_RATE):
    wav_write(path, fs, (audio * 32767).astype("int16"))
    print(f"Saved WAV: {path}")


def trim_silence(audio, top_db=30):
    y, _ = librosa.effects.trim(audio, top_db=top_db)
    return y if y.size > 0 else audio


In [18]:
def load_model_and_processor(model_dir = MODEL_DIR):
    device = "cuda" if torch.cuda.is_available() else "cpu"
    print(f"loading model from {model_dir} on {device}...")
    processor = WhisperProcessor.from_pretrained(model_dir)
    model = WhisperForConditionalGeneration.from_pretrained(model_dir).to(device).eval()
    return model, processor, device


@torch.inference_mode()
def transcribe_audio_array(
    audio_f32_mono_16k: np.ndarray,
    model: WhisperForConditionalGeneration,
    processor: WhisperProcessor,
    device,
    force_english = FORCE_ENGLISH,
    max_new_tokens = 64,
):
    if audio_f32_mono_16k.ndim != 1:
        audio_f32_mono_16k = librosa.to_mono(audio_f32_mono_16k.T)
    audio_f32_mono_16k = trim_silence(audio_f32_mono_16k, top_db=30)

    
    feats = processor.feature_extractor(
        audio_f32_mono_16k, sampling_rate=SAMPLE_RATE, return_tensors="pt"
    )
    input_features = feats.input_features.to(device)
    attention_mask = feats.get("attention_mask")
    if attention_mask is not None:
        attention_mask = attention_mask.to(device)
    gen_kwargs = {}
    if force_english:
        try:
            gen_kwargs["forced_decoder_ids"] = processor.get_decoder_prompt_ids(
                language="english", task="transcribe"
            )
        except Exception:
            pass
    t0 = time.time()
    pred_ids = model.generate(
        input_features,
        attention_mask=attention_mask,
        do_sample=False,
        temperature=0.0,
        num_beams=1,
        length_penalty=0.0,
        max_new_tokens=max_new_tokens,
        **gen_kwargs,
    )
    dt = time.time() - t0

    raw_text = processor.batch_decode(pred_ids, skip_special_tokens=True)[0]
    return raw_text, dt

model, processor, device = load_model_and_processor(MODEL_DIR)


loading model from ./whisper-drone-command-final on cpu...


In [None]:
CONNECTOR_PAT = re.compile(r"\b(?:and then|then|and)\b", flags=re.IGNORECASE)

def heuristic_split_commands(text):
    t = " ".join(text.strip().split())

    preliminary = [s.strip() for s in sent_tokenize(t) if s.strip()]

    segments: list[str] = []
    for seg in preliminary if preliminary else [t]:
        parts = [p.strip() for p in CONNECTOR_PAT.split(seg) if p.strip()]
        segments.extend(parts)
    cleaned = []
    for s in segments:
        s = re.sub(r"\s+", " ", s).strip()
        s = re.sub(r"\.{2,}$", ".", s) 
        s = re.sub(r"\s*\.\s*$", ".", s)        
        if not re.search(r"[.!?]$", s):
            s = s + "."                         
        cleaned.append(s)
    return cleaned


In [None]:
CANONICAL_DIRECTIONS = {"left", "right", "up", "down"}
DIRECTION_SYNONYMS = {
    "left": {
        "left", "leftward", "leftwards", "leftside", "left-side", "left side",
        "port", "lft", "to the left"
    },
    "right": {
        "right", "rightward", "rightwards", "rightside", "right-side", "right side",
        "starboard", "rgt", "to the right"
    },
    "up": {
        "up", "upward", "upwards", "ascend", "rise", "climb", "higher", "elevate",
        "gain altitude", "go above", "above", "increase altitude"
    },
    "down": {
        "down", "downward", "downwards", "descend", "lower", "drop", "decrease",
        "reduce altitude", "below", "go below"
    },
}

WORD2CANON = {}
for canon, words in DIRECTION_SYNONYMS.items():
    for w in words:
        WORD2CANON[w] = canon


class LemmaFuzzyExtractor:
    def __init__(self):
        self.nlp = spacy.load("en_core_web_sm", disable=["parser", "ner"])

    def normalize_direction(self, text, cutoff = 0.75):
        doc = self.nlp(text.lower())
        tokens = [t.text for t in doc if t.is_alpha]
        lemmas = [t.lemma_ for t in doc if t.is_alpha]
        terms = tokens + lemmas
        for w in terms:
            if w in WORD2CANON:
                return WORD2CANON[w]

        vocab = list(WORD2CANON.keys())
        for w in terms:
            m = get_close_matches(w, vocab, n=1, cutoff=cutoff)
            if m:
                return WORD2CANON[m[0]]

        return None

    def extract_distance_meters(self, text):
        tl = text.lower()
        patterns = [
            (r'(\d+(?:\.\d+)?)\s*(?:meter|metre|meters|metres|m\b)', 1.0),
            (r'(\d+(?:\.\d+)?)\s*(?:foot|feet|ft\b)', 0.3048),
            (r'(\d+(?:\.\d+)?)\s*(?:kilometer|kilometers|kilometres|km)\b', 1000.0),
        ]
        for pat, factor in patterns:
            m = re.search(pat, tl)
            if m:
                return float(m.group(1)) * factor
        nums = re.findall(r'\b(\d+(?:\.\d+)?)\b', text)
        for n in nums:
            v = float(n)
            if 0.1 <= v <= 1000:
                return v

        return None


def extract_direction_distance_json(text):
    sentences = heuristic_split_commands(text)
    extractor = LemmaFuzzyExtractor()
    outputs: list[dict] = []

    for s in sentences:
        direction = extractor.normalize_direction(s)
        distance = extractor.extract_distance_meters(s)
        if direction is not None and distance is None:
            distance = DEFAULT_DISTANCE
        if direction is None and distance is None:
            outputs.append({"sentence": s, "direction": "", "distance": None})
        else:
            outputs.append({"sentence": s, "direction": direction, "distance": distance})
    return outputs


In [26]:
audio = record_audio(seconds=SECONDS, fs=SAMPLE_RATE)
save_wav("mic_audio.wav", audio, fs=SAMPLE_RATE)

raw_text, latency = transcribe_audio_array(audio, model, processor, device, force_english=FORCE_ENGLISH)
print(raw_text)
print(f"\nTime: {latency:.3f}s on {device}")
extracted = extract_direction_distance_json(raw_text)
print("\n=== Direction/Distance JSON ===")
print(json.dumps(extracted, ensure_ascii=False, indent=2))


Recording 5s at 16000 Hz mono...
Done.
Saved WAV: mic_audio.wav
 drone you need to move towards left by 10 meters and go above 5 meters

Time: 0.314s on cpu

=== Direction/Distance JSON ===
[
  {
    "sentence": "drone you need to move towards left by 10 meters.",
    "direction": "left",
    "distance": 10.0
  },
  {
    "sentence": "go above 5 meters.",
    "direction": "up",
    "distance": 5.0
  }
]
