forked from morevnaproject-org/papagayo-ng
-
Notifications
You must be signed in to change notification settings - Fork 0
/
auto_recognition.py
161 lines (139 loc) · 6.79 KB
/
auto_recognition.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
import json
import os
import string
import tempfile
from pathlib import Path
import time
import PySide2.QtCore as QtCore
import utilities
if utilities.get_app_data_path() not in os.environ['PATH']:
os.environ['PATH'] += os.pathsep + utilities.get_app_data_path()
import pydub
from pydub.generators import WhiteNoise
from PySide2 import QtWidgets
from allosaurus.app import read_recognizer
from utilities import get_main_dir
class AutoRecognize:
def __init__(self, sound_path):
ini_path = os.path.join(utilities.get_app_data_path(), "settings.ini")
self.settings = QtCore.QSettings(ini_path, QtCore.QSettings.IniFormat)
self.settings.setFallbacksEnabled(False) # File only, not registry or or.
self.allo_model_path = Path(os.path.join(utilities.get_app_data_path(), "allosaurus_model"))
self.temp_wave_file = tempfile.NamedTemporaryFile(suffix=".wav", delete=False).name
self.duration_for_one_second = 1
try:
app = QtWidgets.QApplication.instance()
self.main_window = None
self.threadpool = QtCore.QThreadPool.globalInstance()
for widget in app.topLevelWidgets():
if isinstance(widget, QtWidgets.QMainWindow):
self.main_window = widget
except AttributeError:
self.main_window = None
self.sound_length = 0
self.analysis_finished = False
self.test_decode_time()
self.convert_to_wav(sound_path)
def test_decode_time(self):
five_second_sample = WhiteNoise().to_audio_segment(duration=5000)
five_second_sample = five_second_sample.set_sample_width(2)
five_second_sample = five_second_sample.set_frame_rate(16000)
five_second_sample = five_second_sample.set_channels(1)
five_second_sample_temp_file = tempfile.NamedTemporaryFile(suffix=".wav", delete=False).name
out_ = five_second_sample.export(five_second_sample_temp_file, format="wav", bitrate="256k")
out_.close()
try:
model = read_recognizer("latest", self.allo_model_path)
except TypeError:
model = read_recognizer("latest")
start_time = time.process_time()
model.recognize(five_second_sample_temp_file, timestamp=True,
lang_id=self.settings.value("/VoiceRecognition/allo_lang_id", "eng"),
emit=float(self.settings.value("/VoiceRecognition/allo_emission", 1.0)))
self.duration_for_one_second = (time.process_time() - start_time) / 5
os.remove(five_second_sample_temp_file)
def convert_to_wav(self, sound_path):
pydubfile = pydub.AudioSegment.from_file(sound_path, format=os.path.splitext(sound_path)[1][1:])
pydubfile = pydubfile.set_sample_width(2)
pydubfile = pydubfile.set_frame_rate(16000)
pydubfile = pydubfile.set_channels(1)
half_second_silence = pydub.AudioSegment.silent(500)
self.sound_length = pydubfile.duration_seconds
pydubfile += half_second_silence
out_ = pydubfile.export(self.temp_wave_file, format="wav", bitrate="256k")
out_.close()
def update_progress(self, progress_callback):
expected_time_to_finish = self.sound_length * self.duration_for_one_second
start_time = time.process_time()
finish_time = start_time + expected_time_to_finish
progress_multiplier = 100.0 / expected_time_to_finish
while time.process_time() < finish_time:
if self.analysis_finished:
break
QtCore.QCoreApplication.processEvents()
current_progress = (time.process_time() - start_time) * progress_multiplier
progress_callback.emit(current_progress)
self.main_window.lip_sync_frame.status_progress.hide()
def recognize_allosaurus(self):
try:
model = read_recognizer("latest", self.allo_model_path)
except TypeError:
model = read_recognizer("latest")
if self.main_window:
worker = utilities.Worker(self.update_progress)
self.main_window.lip_sync_frame.status_progress.show()
self.main_window.lip_sync_frame.status_progress.setMaximum(100)
worker.signals.progress.connect(self.main_window.lip_sync_frame.status_bar_progress)
self.threadpool.start(worker)
results = model.recognize(self.temp_wave_file, timestamp=True,
lang_id=self.settings.value("/VoiceRecognition/allo_lang_id", "eng"),
emit=float(self.settings.value("/VoiceRecognition/allo_emission", 1.0)))
self.analysis_finished = True
ipa_list = []
if results:
ipa_convert = json.load(open("ipa_cmu.json", encoding="utf8"))
stress_symbols = [*string.digits, r"!", r"+", r"/", r"#", r"ː", r"ʰ"]
time_list = []
prev_start = 0
for line in results.splitlines():
start, dur, phone = line.split()
phone = "".join(e for e in phone if e not in stress_symbols)
if phone not in ipa_convert:
print("Missing conversion for: " + phone)
if self.main_window:
dlg = QtWidgets.QMessageBox()
dlg.setText("Missing conversion for: " + phone)
dlg.setWindowTitle("Missing Phoneme Conversion")
dlg.setWindowIcon(self.main_window.windowIcon())
dlg.setStandardButtons(QtWidgets.QMessageBox.Ok)
dlg.setDefaultButton(QtWidgets.QMessageBox.Ok)
dlg.setIcon(QtWidgets.QMessageBox.Information)
dlg.exec_()
phone_dict = {"start": float(start), "duration": float(dur), "phoneme": ipa_convert.get(phone)}
time_list.append(float(start) - prev_start)
prev_start = float(start)
ipa_list.append(phone_dict)
time_list.append(self.sound_length - prev_start)
peaks = self.get_level_peaks(time_list)
return ipa_list, peaks, results
else:
return None
def get_level_peaks(self, v):
peaks = [0]
i = 1
while i < len(v) - 1:
pos_left = i
pos_right = i
while v[pos_left] == v[i] and pos_left > 0:
pos_left -= 1
while v[pos_right] == v[i] and pos_right < len(v) - 1:
pos_right += 1
# is_lower_peak = v[pos_left] > v[i] and v[i] < v[pos_right]
is_upper_peak = v[pos_left] < v[i] and v[i] > v[pos_right]
if is_upper_peak:
peaks.append(i)
i = pos_right
peaks.append(len(v) - 1)
return peaks
def __del__(self):
os.remove(self.temp_wave_file)