In [1]:
%pip install --quiet amazon-transcribe soundcard numpy

Note: you may need to restart the kernel to use updated packages.


In [3]:
import soundcard as sc
import numpy as np
import asyncio
from collections import deque
from amazon_transcribe.client import TranscribeStreamingClient
from amazon_transcribe.handlers import TranscriptResultStreamHandler
from amazon_transcribe.model import TranscriptEvent
import time
from dotenv import load_dotenv

load_dotenv()

False

In [4]:
print("Available microphones:")
for mic in sc.all_microphones():
    print(f" - {mic.name}")

Available microphones:
 - Family 17h/19h HD Audio Controller Analog Stereo


In [5]:
class SpeechMetrics:
    def __init__(self, window_seconds=60):
        self.filler_words = {
            'um': 0, 'uh': 0, 'er': 0, 'ah': 0, 'like': 0, 
            'you know': 0, 'sort of': 0, 'kind of': 0
        }
        self.total_words = 0
        self.window_seconds = window_seconds
        # Use deque to maintain a rolling window of words
        self.word_timestamps = deque()
        self.start_time = time.time()
        
    def add_words(self, text: str):
        current_time = time.time()
        words = text.lower().split()
        
        # Update total words
        self.total_words += len(words)
        
        # Add timestamp for each word
        for _ in words:
            self.word_timestamps.append(current_time)
        
        # Remove timestamps older than window_seconds
        while (self.word_timestamps and 
               current_time - self.word_timestamps[0] > self.window_seconds):
            self.word_timestamps.popleft()
        
        # Count filler words
        for filler in self.filler_words:
            self.filler_words[filler] += text.lower().count(filler)
    
    def get_speech_rate(self):
        """Calculate words per minute in the current window"""
        if not self.word_timestamps:
            return 0
        
        # Count words in current window
        current_time = time.time()
        window_words = len([t for t in self.word_timestamps 
                          if current_time - t <= self.window_seconds])
        
        # Calculate words per minute
        minutes = min(self.window_seconds / 60, 
                     (current_time - self.start_time) / 60)
        return round(window_words / minutes) if minutes > 0 else 0
    
    def get_metrics_report(self):
        wpm = self.get_speech_rate()
        total_fillers = sum(self.filler_words.values())
        
        report = f"\n{'='*50}\n"
        report += f"Speech Metrics (last {self.window_seconds} seconds):\n"
        report += f"Speech Rate: {wpm} words per minute\n"
        report += f"Total Words: {self.total_words}\n"
        report += f"Total Filler Words: {total_fillers}\n"
        report += "\nFiller Word Breakdown:\n"
        for word, count in self.filler_words.items():
            if count > 0:
                report += f"  - '{word}': {count}\n"
        report += f"{'='*50}"
        return report

metrics = SpeechMetrics(window_seconds=60)  # Track last 60 seconds

In [6]:
# class MyEventHandler(TranscriptResultStreamHandler):
#     async def handle_transcript_event(self, transcript_event: TranscriptEvent):
#         results = transcript_event.transcript.results
#         for result in results:
#             for alt in result.alternatives:
#                 print(alt.transcript)

class MyEventHandler(TranscriptResultStreamHandler):
    async def handle_transcript_event(self, transcript_event: TranscriptEvent):
        results = transcript_event.transcript.results
        
        for result in results:
            # Only process completed transcriptions
            if not result.is_partial:
                for alt in result.alternatives:
                    transcript = alt.transcript
                    print(f"\nTranscript: {transcript}")
                    
                    # Update metrics
                    metrics.add_words(transcript)
                    # print(metrics.get_metrics_report())

In [7]:
async def capture_audio(stream):
    # Audio parameters
    samplerate = 16000  # Required by Amazon Transcribe
    chunk_size = 1024   # Number of frames per chunk
    
    # Get default microphone
    mic = sc.default_microphone()
    
    async def audio_stream():
        try:
            with mic.recorder(samplerate=samplerate, channels=1, blocksize=chunk_size) as recorder:
                print("🎤 Listening... Press Ctrl+C to stop.")
                while True:
                    # Record audio chunk
                    data = recorder.record(chunk_size)
                    # Convert to the right format (int16) and then to bytes
                    audio_chunk = (data * 32767).astype(np.int16).tobytes()
                    await stream.input_stream.send_audio_event(audio_chunk=audio_chunk)
                    await asyncio.sleep(0.001)  # Small delay to prevent CPU overload
        except KeyboardInterrupt:
            print("\nStopping...")
        finally:
            await stream.input_stream.end_stream()
    
    await audio_stream()

In [8]:
async def main():
    # Create client
    client = TranscribeStreamingClient(region="us-east-1")  # Change region if needed

    # Start transcription stream
    stream = await client.start_stream_transcription(
        language_code="en-US",
        media_sample_rate_hz=16000,
        media_encoding="pcm"
    )

    # Create and start handler
    handler = MyEventHandler(stream.output_stream)
    
    # Start capturing and transcribing
    await asyncio.gather(capture_audio(stream), handler.handle_events())

# Run in Jupyter
await main()

🎤 Listening... Press Ctrl+C to stop.

Transcript: Testing

Transcript: Hello.

Transcript: It's not going as fast as it should be.


Traceback (most recent call last):
  File "/home/ben/.local/lib/python3.10/site-packages/awscrt/http.py", line 242, in _on_body
    self._on_body_cb(http_stream=self, chunk=chunk)
  File "/home/ben/.local/lib/python3.10/site-packages/amazon_transcribe/httpsession.py", line 100, in _on_body
    future.set_result(chunk)
  File "/gnu/store/igala9wg4wbv0d4b0rl2yh5yvy0aiyxh-python-3.10.7/lib/python3.10/concurrent/futures/_base.py", line 546, in set_result
    raise InvalidStateError('{}: {!r}'.format(self._state, self))
concurrent.futures._base.InvalidStateError: CANCELLED: <Future at 0x7f8471d7bfd0 state=cancelled>
Treating Python exception as error 3(AWS_ERROR_UNKNOWN)


CancelledError: 