In [None]:
import torch
import whisper
import pyaudio
import wave
from transformers import BertTokenizer, BertForSequenceClassification
from torch.utils.data import DataLoader, Dataset
import os
import torch
import torchaudio
from transformers import Wav2Vec2ForCTC, Wav2Vec2Tokenizer
from rapidfuzz import process

# ---------- Audio Recording ----------
def record_audio(filename="command.wav", duration=4, rate=16000):
    p = pyaudio.PyAudio()
    stream = p.open(format=pyaudio.paInt16, channels=1, rate=rate, input=True, frames_per_buffer=1024)
    frames = [stream.read(1024) for _ in range(0, int(rate / 1024 * duration))]
    stream.stop_stream(); stream.close(); p.terminate()

    with wave.open(filename, 'wb') as wf:
        wf.setnchannels(1); wf.setsampwidth(p.get_sample_size(pyaudio.paInt16)); wf.setframerate(rate)
        wf.writeframes(b''.join(frames))

    return filename


# def record_audio(filename="command.wav", duration=4, rate=16000, device_index=1):
#     try:
#         FORMAT = pyaudio.paInt16
#         CHANNELS = 1
#         CHUNK = 1024

#         audio = pyaudio.PyAudio()

#         print(f"🎙️ Recording from device index {device_index}...")

#         stream = audio.open(format=FORMAT,
#                             channels=CHANNELS,
#                             rate=rate,
#                             input=True,
#                             input_device_index=device_index,
#                             frames_per_buffer=CHUNK)

#         frames = []
#         for _ in range(int(rate / CHUNK * duration)):
#             data = stream.read(CHUNK)
#             frames.append(data)

#         print("✅ Done recording.")

#         stream.stop_stream()
#         stream.close()
#         audio.terminate()

#         with wave.open(filename, 'wb') as wf:
#             wf.setnchannels(CHANNELS)
#             wf.setsampwidth(audio.get_sample_size(FORMAT))
#             wf.setframerate(rate)
#             wf.writeframes(b''.join(frames))

#         return filename

#     except Exception as e:
#         print(f"❌ Error: {e}")
#         return None



# ---------- Facebook Wav2Vec 2.0 ----------
wav2vec_tokenizer = Wav2Vec2Tokenizer.from_pretrained("facebook/wav2vec2-base-960h")
wav2vec_model = Wav2Vec2ForCTC.from_pretrained("facebook/wav2vec2-base-960h")
wav2vec_model.eval()

def transcribe(audio_path):
    waveform, sample_rate = torchaudio.load(audio_path)
    if sample_rate != 16000:
        resampler = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=16000)
        waveform = resampler(waveform)
    waveform = waveform.squeeze()

    input_values = wav2vec_tokenizer(waveform.numpy(), return_tensors="pt").input_values

    with torch.no_grad():
        logits = wav2vec_model(input_values).logits

    predicted_ids = torch.argmax(logits, dim=-1)
    transcription = wav2vec_tokenizer.decode(predicted_ids[0])
    return transcription.lower()

# ---------- BERT Intent Classifier ----------
# intent_labels = ["MOVE_FORWARD", "TURN_LEFT", "STOP", "TURN_RIGHT"]

# tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
# model = BertForSequenceClassification.from_pretrained("bert-base-uncased", num_labels=4)
# model.load_state_dict(torch.load("models/intent_model.pt", map_location=torch.device('cpu')))
# model.eval()

# def predict_intent(text):
#     inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True, max_length=32)
#     with torch.no_grad():
#         outputs = model(**inputs)   # use `model` here, not `intent_model`
#         predicted = torch.argmax(outputs.logits, dim=1)
#     return intent_labels[predicted.item()]

intent_labels = ["MOVE_FORWARD", "TURN_LEFT", "STOP", "TURN_RIGHT"]

tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
model = BertForSequenceClassification.from_pretrained("bert-base-uncased", num_labels=4)
model.load_state_dict(torch.load("models/intent_model.pt", map_location=torch.device('cpu')))
model.eval()

command_phrases = ["move forward", "turn left", "stop", "turn right"]

def correct_command(text):
    best_match, score, _ = process.extractOne(text.lower(), command_phrases)
    if score > 70:
        return best_match
    return text.lower()

def predict_intent(text):
    corrected_text = correct_command(text)
    inputs = tokenizer(corrected_text, return_tensors="pt", truncation=True, padding=True, max_length=32)
    with torch.no_grad():
        outputs = model(**inputs)
        predicted = torch.argmax(outputs.logits, dim=1)
    return intent_labels[predicted.item()]



def execute_command(cmd):
    print(f"Command: {cmd}")
    if cmd == "MOVE_FORWARD":
        # publish to ROS or send to motor
        print("Robot moving forward...")
    elif cmd == "TURN_LEFT":
        print("Turning left")
    elif cmd == "STOP":
        print("Stopping")
    else:
        print("Unknown action")

# ---------- Main Execution ----------
if __name__ == "__main__":
    print("Capturing Audio....")
    audio_path = record_audio();
    if not os.path.exists(audio_path):
        print(f"❌ Audio file '{audio_path}' not found.")
    else:
        print("📢 Transcribing...")
        try:
            text = transcribe(audio_path)
            print("📝 Recognized Speech:", text)
        except Exception as e:
            print(f"❌ Transcription failed: {e}")
            text = None

        if text:
            print("🔍 Predicting Intent...")
            try:
                intent = predict_intent(correct_command(text))
                print("✅ Intent Detected:", intent)
            except Exception as e:
                print(f"❌ Intent prediction failed: {e}")
                
                
    execute_command(intent);




  from .autonotebook import tqdm as notebook_tqdm
The tokenizer class you load from this checkpoint is not the same type as the class this function is called from. It may result in unexpected tokenization. 
The tokenizer class you load from this checkpoint is 'Wav2Vec2CTCTokenizer'. 
The class this function is called from is 'Wav2Vec2Tokenizer'.
Some weights of Wav2Vec2ForCTC were not initialized from the model checkpoint at facebook/wav2vec2-base-960h and are newly initialized: ['wav2vec2.masked_spec_embed']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Capturing Audio....
📢 Transcribing...
📝 Recognized Speech: ton right ton ight
🔍 Predicting Intent...
✅ Intent Detected: MOVE_FORWARD
Command: MOVE_FORWARD
Robot moving forward...


In [None]:
import pyaudio

p = pyaudio.PyAudio()
for i in range(p.get_device_count()):
    info = p.get_device_info_by_index(i)
    print(f"Device {i}: {info['name']}")

p.terminate()

In [None]:
import os

audio_path = r"E:\Germany\Document\TU_Dortmund\Projects\Audio_Perception\turn_left.wav"

if os.path.exists(audio_path):
    print("✅ File exists.")
else:
    print("❌ File not found.")


In [None]:
import wave

try:
    with wave.open(audio_path, 'rb') as wf:
        print("✅ WAV file opened successfully.")
        print(f"Channels: {wf.getnchannels()}, Rate: {wf.getframerate()}, Duration: {wf.getnframes() / wf.getframerate():.2f}s")
except wave.Error as e:
    print(f"❌ Not a valid WAV file: {e}")
