-
Notifications
You must be signed in to change notification settings - Fork 394
/
base_transcriber.py
107 lines (79 loc) 路 3.26 KB
/
base_transcriber.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
from __future__ import annotations
import asyncio
import audioop
from opentelemetry import trace, metrics
from typing import Generic, TypeVar, Union
from vocode.streaming.models.audio_encoding import AudioEncoding
from vocode.streaming.models.model import BaseModel
from vocode.streaming.models.transcriber import TranscriberConfig
from vocode.streaming.utils.worker import AsyncWorker, ThreadAsyncWorker
tracer = trace.get_tracer(__name__)
meter = metrics.get_meter(__name__)
class Transcription(BaseModel):
message: str
confidence: float
is_final: bool
is_interrupt: bool = False
def __str__(self):
return f"Transcription({self.message}, {self.confidence}, {self.is_final})"
TranscriberConfigType = TypeVar("TranscriberConfigType", bound=TranscriberConfig)
class AbstractTranscriber(Generic[TranscriberConfigType]):
def __init__(self, transcriber_config: TranscriberConfigType):
self.transcriber_config = transcriber_config
self.is_muted = False
def mute(self):
self.is_muted = True
def unmute(self):
self.is_muted = False
def get_transcriber_config(self) -> TranscriberConfigType:
return self.transcriber_config
async def ready(self):
return True
def create_silent_chunk(self, chunk_size, sample_width=2):
linear_audio = b"\0" * chunk_size
if self.get_transcriber_config().audio_encoding == AudioEncoding.LINEAR16:
return linear_audio
elif self.get_transcriber_config().audio_encoding == AudioEncoding.MULAW:
return audioop.lin2ulaw(linear_audio, sample_width)
class BaseAsyncTranscriber(AbstractTranscriber[TranscriberConfigType], AsyncWorker):
def __init__(
self,
transcriber_config: TranscriberConfigType,
):
self.input_queue: asyncio.Queue[bytes] = asyncio.Queue()
self.output_queue: asyncio.Queue[Transcription] = asyncio.Queue()
AsyncWorker.__init__(self, self.input_queue, self.output_queue)
AbstractTranscriber.__init__(self, transcriber_config)
async def _run_loop(self):
raise NotImplementedError
def send_audio(self, chunk):
if not self.is_muted:
self.consume_nonblocking(chunk)
else:
self.consume_nonblocking(self.create_silent_chunk(len(chunk)))
def terminate(self):
AsyncWorker.terminate(self)
class BaseThreadAsyncTranscriber(
AbstractTranscriber[TranscriberConfigType], ThreadAsyncWorker
):
def __init__(
self,
transcriber_config: TranscriberConfigType,
):
self.input_queue: asyncio.Queue[bytes] = asyncio.Queue()
self.output_queue: asyncio.Queue[Transcription] = asyncio.Queue()
ThreadAsyncWorker.__init__(self, self.input_queue, self.output_queue)
AbstractTranscriber.__init__(self, transcriber_config)
def _run_loop(self):
raise NotImplementedError
def send_audio(self, chunk):
if not self.is_muted:
self.consume_nonblocking(chunk)
else:
self.consume_nonblocking(self.create_silent_chunk(len(chunk)))
def terminate(self):
ThreadAsyncWorker.terminate(self)
BaseTranscriber = Union[
BaseAsyncTranscriber[TranscriberConfigType],
BaseThreadAsyncTranscriber[TranscriberConfigType],
]