Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shutdown issue #20

Open
ashulyak opened this issue Jan 20, 2024 · 10 comments
Open

Shutdown issue #20

ashulyak opened this issue Jan 20, 2024 · 10 comments

Comments

@ashulyak
Copy link

ashulyak commented Jan 20, 2024

Hello, I have problem with shutdown method when using microphone =False, it always stuck on

    logging.debug('Finishing recording thread')
    if self.recording_thread:
        self.recording_thread.join()

Example code:

if __name__ == '__main__':
    import pyaudio
    import threading
    from RealtimeSTT import AudioToTextRecorder
    import wave
    import time

    import logging


    recorder = None
    recorder_ready = threading.Event()

    recorder_config = {
        'spinner': False,
        'use_microphone': False,
        'model': "tiny.en",
        'language': 'en',
        'silero_sensitivity': 0.4,
        'webrtc_sensitivity': 2,
        'post_speech_silence_duration': 0.7,
        'min_length_of_recording': 0,
        'min_gap_between_recordings': 0
    }

    FORMAT = pyaudio.paInt16
    CHANNELS = 1
    RATE = 16000
    CHUNK = 1024

    REALTIMESTT = True


    def recorder_thread():
        global recorder
        print("Initializing RealtimeSTT...")
        recorder = AudioToTextRecorder(**recorder_config,level=logging.DEBUG)
        print("RealtimeSTT initialized")
        recorder_ready.set()
        while True:
            full_sentence = recorder.text()
            if full_sentence:
                print(f"\rSentence: {full_sentence}")




    recorder_thread = threading.Thread(target=recorder_thread)
    recorder_thread.start()
    recorder_ready.wait()
    with wave.open('Iiterviewing.wav', 'rb') as wav_file:
        assert wav_file.getnchannels() == CHANNELS
        assert wav_file.getsampwidth() == pyaudio.get_sample_size(FORMAT)
        assert wav_file.getframerate() == RATE
        data = wav_file.readframes(CHUNK)
        while data:
            time.sleep(0.1)
            recorder.feed_audio(data)
            data = wav_file.readframes(CHUNK)
    print("before")
    recorder.shutdown()
    print("after")


@KoljaB
Copy link
Owner

KoljaB commented Jan 21, 2024

You are right, it's a bug, thanks for pointing this out. Will release new version soon with a bugfix.

KoljaB added a commit that referenced this issue Jan 29, 2024
KoljaB added a commit that referenced this issue Jan 29, 2024
@nsadeh
Copy link

nsadeh commented Apr 21, 2024

I think I have a similar problem. I have a recorder per WebSocket connection in a setup, and I can't seem to clean up the recorder.

recorder.shutdown() hangs forever, and if I don't call it, I get a resource leak.

Here's my code:

import asyncio
import gc
import json
import threading
from RealtimeSTT import AudioToTextRecorder
import numpy as np
from scipy.signal import resample


def decode_and_resample(
        audio_data,
        original_sample_rate,
        target_sample_rate) -> bytes:

    # Decode 16-bit PCM data to numpy array
    audio_np = np.frombuffer(audio_data, dtype=np.int16)

    # Calculate the number of samples after resampling
    num_original_samples = len(audio_np)
    num_target_samples = int(num_original_samples * target_sample_rate /
                             original_sample_rate)

    # Resample the audio
    resampled_audio = resample(audio_np, num_target_samples)

    return resampled_audio.astype(np.int16).tobytes()


class RealtimeAudio:
    recorder: AudioToTextRecorder

    def recorder_config(self):
        return {
            'spinner': False,
            'use_microphone': False,
            'model': 'large-v2',
            'language': 'en',
            'silero_sensitivity': 0.4,
            'webrtc_sensitivity': 2,
            'post_speech_silence_duration': 0.7,
            'min_length_of_recording': 0,
            'min_gap_between_recordings': 0,
            'enable_realtime_transcription': True,
            'realtime_processing_pause': 0,
            'realtime_model_type': 'tiny.en',
            'on_realtime_transcription_stabilized': self.on_text,
            'on_vad_detect_stop': self.on_vad_detect_stop
        }

    def __init__(self, output: asyncio.Queue):
        self.output = output
        self.recorder_active = threading.Event()
        self.question = ""
        self.loop = asyncio.new_event_loop()
        threading.Thread(target=self.start_async_loop,
                         args=(self.loop,), daemon=True).start()
        self.processor = threading.Thread(
            target=self._text_processing_thread, daemon=True)
        self.processor.start()
        self.recorder_active.wait()

    def on_vad_detect_stop(self):
        self.question = ""

    def start_async_loop(self, loop):
        asyncio.set_event_loop(loop)
        loop.run_forever()

    def on_text(self, text):
        self.send_message({"message": "detected_text"})
        print(f"detected text {text}")

    def shutdown(self):
        self.recorder.shutdown() # hangs forever, I comment it out and get a resource leak
        self.recorder_active.clear()
        # self.processor.join()
        # self.loop.call_soon_threadsafe(self.loop.stop)

    def send_message(self, message):
        asyncio.run_coroutine_threadsafe(self.output.put(message), self.loop)

    def feed_chunk(self, audio_chunk: bytes):
        if not self.recorder_active.is_set():
            print("Attempted to run audio on shutdown audio client")
            self.output.put({"message": "recorder_shutdown"})
        else:
            metadata_length = int.from_bytes(
                audio_chunk[:4], byteorder='little')
            metadata_json = audio_chunk[4:4+metadata_length].decode('utf-8')
            metadata = json.loads(metadata_json)

            sample_rate = metadata["sampleRate"]
            chunk = audio_chunk[4+metadata_length:]
            resampled_chunk = decode_and_resample(chunk, sample_rate, 16000)

            self.recorder.feed_audio(resampled_chunk)

    def _text_processing_thread(self):
        self.recorder = AudioToTextRecorder(**self.recorder_config())
        self.send_message({"message": "recorder_ready"})
        self.recorder_active.set()
        while self.recorder_active.is_set():
            full_sentence = self.recorder.text()
            if full_sentence is None or full_sentence == "":
                break
            print(f"full sentence: {full_sentence}")
            self.question += full_sentence

In the websocket endpoint:

output = asyncio.Queue()
audio = RealtimeAudio(output)

asyncio.get_event_loop().create_task(
    output_websockets(output, ws))

try:
    async for chunk in ws.iter_bytes():
        audio.feed_chunk(chunk)
except WebSocketDisconnect:
        print("Websocket disconnected!")
        audio.shutdown()

    finally:
        audio.shutdown()

@KoljaB
Copy link
Owner

KoljaB commented Apr 21, 2024

Please try calling shutdown only once:

output = asyncio.Queue()
audio = RealtimeAudio(output)

asyncio.get_event_loop().create_task(
    output_websockets(output, ws))

try:
    async for chunk in ws.iter_bytes():
        audio.feed_chunk(chunk)
except WebSocketDisconnect:
    print("Websocket disconnected!")

finally:
    audio.shutdown()

Does it still happen then?

@nsadeh
Copy link

nsadeh commented Apr 23, 2024

So I actually tried this in a shell to make sure:

output = asyncio.Queue()
audio = RealtimeAudio(output)
audio.shutdown()

the last call hangs

@KoljaB
Copy link
Owner

KoljaB commented Apr 23, 2024

Ok, I'll check that

@nsadeh
Copy link

nsadeh commented Apr 23, 2024

@KoljaB I know this isn't directly related to this issue, but now that my code is up, would you be able to speak to the scalability of this approach? I appreciate the examples in this codebase but I am not sure how many recorder objects are intended to coexist in one Python application. Because of the separation of read and write to the recorder object and its internal state, I don't think it's possible to have fewer than one recorder object per websocket connection for the transcription-over-websocket architecture. Am I wrong?

@KoljaB
Copy link
Owner

KoljaB commented Apr 23, 2024

I thought about how far I want to develop each of my projects. For both RealtimeSTT and RealtimeTTS I want to provide a comfortable and stable webserver solution that works in a browser and covers full functionality. But serving multiple users is where I draw the line, because testing this is out of my scope and I did not find a personal use case for this. So currently I decided to focus on delivering core functionality. I currently may have too many project to bring every single one to its full potential.

@KoljaB
Copy link
Owner

KoljaB commented Apr 23, 2024

Found the issue. The recording worker need chunks to be feeded into. Currently otherwise it blocks in data = self.audio_queue.get(). This not perfect and I should integrate a timeout. In all my use cases currently it was kind of guaranteed that the microphone delivers chunks (or an external source does that uses feed_audio method). Will prob change that soon so that is safe to use in all cases. Currently you should be able to shutdown as soon as chunks come in, so might feed silence on shutdown if you don't want to process chunks all the time.

@nsadeh
Copy link

nsadeh commented Apr 24, 2024

I thought about how far I want to develop each of my projects. For both RealtimeSTT and RealtimeTTS I want to provide a comfortable and stable webserver solution that works in a browser and covers full functionality. But serving multiple users is where I draw the line, because testing this is out of my scope and I did not find a personal use case for this. So currently I decided to focus on delivering core functionality. I currently may have too many project to bring every single one to its full potential.

I am very grateful for the solution you built! It will help me build an MVP and inspire how we scale.

@nsadeh
Copy link

nsadeh commented Apr 26, 2024

@KoljaB I changed the shutdown method to the following:

def shutdown(self):
        # self.recorder_active.clear()
        self.recorder.feed_audio(b'AAAAAAAAAAA')
        self.recorder.shutdown()
        # self.processor.join()
        self.loop.call_soon_threadsafe(self.loop.stop)

And it still hangs on recorder.shutdown(). The strange thing is, when I ran the following code:

In [10]: from RealtimeSTT import AudioToTextRecorder
In [11]: a = AudioToTextRecorder(**{})
In [12]: a.feed_audio(b'AAAAAAAAAAA')
In [13]: a.shutdown()

it runs just fine. Do you have advice? I keep getting issues with resource leaks when I don't shut this down:

/opt/homebrew/Cellar/python@3.12/3.12.2_1/Frameworks/Python.framework/Versions/3.12/lib/python3.12/multiprocessing/resource_tracker.py:254: UserWarning: resource_tracker: There appear to be 124 leaked semaphore objects to clean up at shutdown

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants