In [1]:
# pip install PyAudioWPatch

from scipy.signal import resample
import pyaudiowpatch as pyaudio
import time
import matplotlib.pyplot as plt
import numpy as np
from datetime import datetime
from transformers import WhisperProcessor, WhisperForConditionalGeneration
import torch
import wave
import soundcard as sc
import soundfile as sf
import scipy.io.wavfile as wavfile

def save_wave(audio_frames,rate):
    # Scale back to 16-bit PCM format for saving
    audio_normalized = (audio_frames * 32767).astype(np.int16)
    # Save the normalized audio to a WAV file
    wav_file = f"temp_audio.wav"
    wavfile.write(wav_file, rate, audio_normalized)

In [2]:

INPUT_DEVICE = {1:'microphone', 2:'speaker'}
""" for pc_speaker, go to the sound icon and right click-> choose sounds -> recordings -> choose streo Mix as default  """
INPUT_DEVICE_IDX = 1


device='cuda'
# Load model and processor
processor = WhisperProcessor.from_pretrained("openai/whisper-small")
model = WhisperForConditionalGeneration.from_pretrained("openai/whisper-small").to(device)
# forced_decoder_ids = processor.get_decoder_prompt_ids(language="french", task="transcribe")#for french to french
# forced_decoder_ids = processor.get_decoder_prompt_ids(language="french", task="translate")#for french to english
forced_decoder_ids = None # for english to english



# Initialize PyAudio
p = pyaudio.PyAudio()

#  TODO create a nice function from this
if INPUT_DEVICE[INPUT_DEVICE_IDX] == 'microphone':
    micro_phone = p.get_default_input_device_info()

elif INPUT_DEVICE[INPUT_DEVICE_IDX] == 'speaker':
    # Get default WASAPI info
    wasapi_info = p.get_host_api_info_by_type(pyaudio.paWASAPI)
    # Get default WASAPI speakers
    micro_phone = p.get_device_info_by_index(wasapi_info["defaultOutputDevice"])
    if not micro_phone["isLoopbackDevice"]:
        for loopback in p.get_loopback_device_info_generator():
            """
            Try to find loopback device with same name(and [Loopback suffix]).
            Unfortunately, this is the most adequate way at the moment.
            """
            if micro_phone["name"] in loopback["name"]:
                micro_phone = loopback
                break
    else:
        pass

print(f'The loopback device is {micro_phone}')

# Settings for recording audio
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = int(micro_phone['defaultSampleRate'] ) 
WHISPER_RATE = 16000# Whisper expects 16kHz input
CHUNK = 1024  # Number of frames per buffer
TRANSCRIPTION_INTERVAL = 5  # Interval for transcription in seconds

# Open a stream to record audio

stream = p.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK, input_device_index=micro_phone["index"])

print("Listening for audio... Speak now.")

audio_buffer = np.array([], dtype=np.float32)  # Buffer to store accumulated audio
last_transcription_time = time.time()  # Initialize the last transcription time
audio_frames = np.array([], dtype=np.float32)  # Store raw audio frames

# Open a text file to save the transcriptions
transcription_file = open("transcriptions_cpu.txt", "a")


while True:
    try:

        # Read a chunk of audio
        data = stream.read(CHUNK, exception_on_overflow=False)  # Read a chunk of audio (1024 samples per chunk
        audio_chunk = np.frombuffer(data, np.int16).flatten().astype(np.float32) / 32768.0
        audio_chunk = resample(audio_chunk, int(len(audio_chunk) * WHISPER_RATE / RATE))  # Resample to 16kHz (it is needed as the WHISPER model is trained on 16KHZ data)


        audio_buffer = np.append(audio_buffer, audio_chunk)
        audio_frames = np.append(audio_frames, audio_chunk)  # Save raw audio data for MP3 conversion
     
        # Check if it's time to perform transcription
        current_time = time.time()
        if current_time - last_transcription_time >= TRANSCRIPTION_INTERVAL:
            if audio_buffer.size > 0:  # Ensure there's audio to transcribe

                start_translation_time = time.time()

                input_features = processor(audio_buffer, sampling_rate=WHISPER_RATE, return_tensors="pt").input_features
                # Generate token ids
                predicted_ids = model.generate(input_features.to(device), forced_decoder_ids=forced_decoder_ids)
                # Decode token ids to text
                transcription = processor.batch_decode(predicted_ids, skip_special_tokens=True)
                transcription_text = transcription[0]

                translation_duration = time.time() - start_translation_time

                # Save the transcription with a timestamp to the file
                timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                transcription_file.write(f"[{timestamp}] {transcription_text}\n")
                transcription_file.flush()  # Ensure it's written to the file immediately


                # Print the transcription
                print(f"Transcription ({translation_duration:0.3f}s): {transcription_text}")

                # Clear buffer after transcription
                audio_buffer = np.array([], dtype=np.float32)

            last_transcription_time = current_time  # Update last transcription time

    except KeyboardInterrupt:
        print("Stopped listening.")
        save_wave(audio_frames, rate=WHISPER_RATE)
        break

# Stop and close the stream
stream.stop_stream()
stream.close()
p.terminate()



Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


The loopback device is {'index': 1, 'structVersion': 2, 'name': 'Headset (realme Buds Wireless 3', 'hostApi': 0, 'maxInputChannels': 2, 'maxOutputChannels': 0, 'defaultLowInputLatency': 0.09, 'defaultLowOutputLatency': 0.09, 'defaultHighInputLatency': 0.18, 'defaultHighOutputLatency': 0.18, 'defaultSampleRate': 44100.0, 'isLoopbackDevice': False}
Listening for audio... Speak now.


  attn_output = torch.nn.functional.scaled_dot_product_attention(


Transcription (2.597s):  Special tokens have been added in the vocabulary make
Transcription (0.524s):  screen.
Stopped listening.


In [13]:
import pyaudiowpatch as pyaudio
p = pyaudio.PyAudio()

    # Get default WASAPI info
wasapi_info = p.get_host_api_info_by_type(pyaudio.paWASAPI)
# Get default WASAPI speakers
default_speakers = p.get_device_info_by_index(wasapi_info["defaultOutputDevice"])
if not default_speakers["isLoopbackDevice"]:
    for loopback in p.get_loopback_device_info_generator():
        """
        Try to find loopback device with same name(and [Loopback suffix]).
        Unfortunately, this is the most adequate way at the moment.
        """
        if default_speakers["name"] in loopback["name"]:
            default_speakers = loopback
            break

default_speakers

{'index': 26,
 'structVersion': 2,
 'name': 'Headphones (realme Buds Wireless 3 Stereo) [Loopback]',
 'hostApi': 2,
 'maxInputChannels': 2,
 'maxOutputChannels': 0,
 'defaultLowInputLatency': 0.003,
 'defaultLowOutputLatency': 0.0,
 'defaultHighInputLatency': 0.01,
 'defaultHighOutputLatency': 0.0,
 'defaultSampleRate': 44100.0,
 'isLoopbackDevice': True}

In [15]:
p.get_default_input_device_info()

{'index': 1,
 'structVersion': 2,
 'name': 'Headset (realme Buds Wireless 3',
 'hostApi': 0,
 'maxInputChannels': 2,
 'maxOutputChannels': 0,
 'defaultLowInputLatency': 0.09,
 'defaultLowOutputLatency': 0.09,
 'defaultHighInputLatency': 0.18,
 'defaultHighOutputLatency': 0.18,
 'defaultSampleRate': 44100.0,
 'isLoopbackDevice': False}