# V2

In [None]:
__name__

In [2]:
##### HAVE AN ISSUE HERE WITH gr.State 

import os
import sys
import logging

import gradio as gr

from speechgrid import SpeechGrid, SpeechGridInterface
from config import setup_logger, output_dir



if len(sys.argv) == 2:
    shareable=bool(int(sys.argv[1]))
else:
    shareable = False


def init_speech_grid_interface(config_file='config.yaml'):
    return SpeechGridInterface(config_file=config_file)




def create_gradio_interface(speech_grid_interface):

    def reset_min_max(exact,min_v,max_v):
        if exact > 0:
            min_v = 0
            max_v = 0
        speech_grid_interface.set_speaker_numbers(exact, min_v, max_v)
        return exact, min_v, max_v

    def validate_min_max(exact, min_v, max_v):
        if min_v > 0 or max_v > 0:
            exact = 0
            if min_v > max_v:
                max_v = min_v
        speech_grid_interface.set_speaker_numbers(exact, min_v, max_v)
        return exact, min_v, max_v
    
    def audio_stat(audio_value):
        if audio_value:
            return gr.Button("Process Audio", interactive=True)
        else:
            return gr.Button("Process Audio", interactive=False)
    
    def audio_record():
        speech_grid_interface.recorded_speech = True
    
    def audio_upload():
        speech_grid_interface.recorded_speech = False
    
    def set_tasks(tasks):
        speech_grid_interface.set_tasks(tasks)
    
    with gr.Blocks(title="SpeechGrid", theme=gr.themes.Soft()) as gui:
        
        with gr.Tab('Main'):
    
            record_audio = gr.Audio(sources=["microphone","upload"], type="filepath")
            
            
        
            tasks = gr.CheckboxGroup(choices=[("Speech to Text","ASR"),
                                              ("Speaker Separation","SD"),
                                              ("Speech Detection","VAD")],
                                     value=speech_grid_interface.get_tasks(),
                                     label="Tasks",
                                     info="Apply the following tasks:")
            tasks.input(set_tasks, inputs=tasks)
                
        
            process = gr.Button("Process Audio", interactive=False)
    
            record_audio.input(audio_stat, inputs=record_audio, outputs=process)
            record_audio.stop_recording(audio_record)
            record_audio.upload(audio_upload)
        
            output_text = gr.Textbox(label='Progress', interactive=False)
            with gr.Row():
                with gr.Column():
                    d1 = gr.DownloadButton("Download output", visible=False)
                with gr.Column():
                    d2 = gr.DownloadButton("Download speech file", visible=False)
        
            
            
        with gr.Tab('Advanced Options'):
            gr.Markdown(
                """
                ### Speaker Separation
                Number of speakers
                """)
            with gr.Row():
                n_exact = gr.Number(label='Exact')
                n_min = gr.Number(label='Minimum')
                n_max = gr.Number(label='Maximum')
    
                n_exact.input(reset_min_max,
                               inputs=[n_exact, n_min, n_max],
                              outputs=[n_exact, n_min, n_max])
                n_min.input(validate_min_max,
                            inputs=[n_exact, n_min, n_max],
                            outputs=[n_exact, n_min, n_max])
                n_max.input(validate_min_max,
                            inputs=[n_exact, n_min, n_max],
                            outputs=[n_exact, n_min, n_max])
            gr.Markdown(
                """
                ### Speech to Text
                """
            )
            with gr.Row():
                avail_lang = [(k,v) for k,v in speech_grid_interface.get_asr_available_lang().items()]
                lang_drop = gr.Dropdown(label='Language', choices=avail_lang, value=speech_grid_interface.get_asr_lang())
                lang_drop.change(speech_grid_interface.set_asr_lang,
                                inputs=lang_drop)
    
                is_lm_enabled = speech_grid_interface.get_lm_enable()
                lm_enable = gr.Checkbox(label='Enable Language Model', value=is_lm_enabled, interactive=True)
                lm_enable.change(speech_grid_interface.set_lm_enable,
                                inputs=lm_enable)
        
        process.click(speech_grid_interface.process, 
                          inputs=record_audio,
                          outputs=[output_text, d1, d2])
         
    gui.queue().launch(share=shareable)

if __name__ == '__main__':

    setup_logger()
    logger = logging.getLogger(__name__)
    
    
    speech_grid_interface = init_speech_grid_interface(config_file='config.yaml')

    create_gradio_interface(speech_grid_interface)

 

  from .autonotebook import tqdm as notebook_tqdm
2024-10-21 22:35:32,648 - httpx - INFO - HTTP Request: GET http://127.0.0.1:7860/startup-events "HTTP/1.1 200 OK"
2024-10-21 22:35:32,660 - httpx - INFO - HTTP Request: HEAD http://127.0.0.1:7860/ "HTTP/1.1 200 OK"


Running on local URL:  http://127.0.0.1:7860

To create a public link, set `share=True` in `launch()`.


2024-10-21 22:35:32,917 - httpx - INFO - HTTP Request: GET https://checkip.amazonaws.com/ "HTTP/1.1 200 "
2024-10-21 22:35:32,984 - httpx - INFO - HTTP Request: GET https://checkip.amazonaws.com/ "HTTP/1.1 200 "
2024-10-21 22:35:33,289 - httpx - INFO - HTTP Request: GET https://api.gradio.app/pkg-version "HTTP/1.1 200 OK"
--------
2024-10-21 22:35:55,168 - speechgrid - INFO - Loading ASR Model...
Loading the LM will be faster if you build a binary file.
2024-10-21 22:35:58,953 - pyctcdecode.decoder - INFO - Using arpa instead of binary LM file, decoder instantiation might be slow.
Reading /Users/z5173707/root/projects/speechgrid/Models/ASR/LM/ngram/4gram_big.arpa
----5---10---15---20---25---30---35---40---45---50---55---60---65---70---75---80---85---90---95--100
****************************************************************************************************
2024-10-21 22:35:58,993 - pyctcdecode.alphabet - INFO - Alphabet determined to be of regular style.
2024-10-21 22:35:58,993 - 

In [None]:
   
    
#TODO: Progress bar
#TODO: VAD ########DONE
#TODO: Number of speakers  ######DONE
#TODO: Word alignment
#TODO: Download button  ######DONE
#TODO: Wrape in a docker #####DONE
#TODO: Test pyannote offline  ########DONE
#TODO: Logging of errors and info    #########IN PROGRESS
#TODO: ASR with LM  ########DONE
#TODO: Process batch
#TODO: Kaldi ASR
#TODO: MMS ASR   ######DONE
#TODO: Add parameters selection for ASR, SD, VAD
#TODO: Rewrite the textgrid
#TODO: TextGrid code to get the logger and use logging instead of print.
#TODO: Add logging to other packages
#TODO: Use .bin instead of ARPA in LM
#TODO: ASR add the expected words
#TODO: Consider control the offset in interval ASR
#TODO: Add parameters of min silence duration #NEED TO WELL UNDERSTAND THESE PARAMETERS
#TODO: Create set, get for speaker number
#TODO: Name of file as the uploaded file
#TODO: Make process enable after loading or recording ####DONE####
#TODO: Use import tempfile to access the temp dir if need to do so
#TODO: Review the use of get and set, use @property instead or direct access
#TODO: Add nemo diarization

#TODO: May specify the min but max 0?!

def reset_min_max(exact,min_v,max_v):
    if exact > 0:
        min_v = 0
        max_v = 0
    speech_grid_interface.set_speaker_numbers(exact, min_v, max_v)
    return exact, min_v, max_v

def validate_min_max(exact, min_v, max_v):
    if min_v > 0 or max_v > 0:
        exact = 0
        if min_v > max_v:
            max_v = min_v
    speech_grid_interface.set_speaker_numbers(exact, min_v, max_v)
    return exact, min_v, max_v

def audio_stat(audio_value):
    if audio_value:
        return gr.Button("Process Audio", interactive=True)
    else:
        return gr.Button("Process Audio", interactive=False)

def audio_record():
    speech_grid_interface.recorded_speech = True

def audio_upload():
    speech_grid_interface.recorded_speech = False

with gr.Blocks(title="SpeechGrid", theme=gr.themes.Soft()) as gui:

    with gr.Tab('Main'):

        record_audio = gr.Audio(sources=["microphone","upload"], type="filepath")
        
        
    
        tasks = gr.CheckboxGroup(choices=[("Speech to Text","ASR"),
                                          ("Speaker Separation","SD"),
                                          ("Speech Detection","VAD")],
                                 value=speech_grid_interface.get_tasks(),
                                 label="Tasks",
                                 info="Apply the following tasks:")
        tasks.input(speech_grid_interface.set_tasks, inputs=tasks)
            
    
        process = gr.Button("Process Audio", interactive=False)

        record_audio.input(audio_stat, inputs=record_audio, outputs=process)
        record_audio.stop_recording(audio_record)
        record_audio.upload(audio_upload)
    
        output_text = gr.Textbox(label='Progress', interactive=False)
        with gr.Row():
            with gr.Column():
                d1 = gr.DownloadButton("Download output", visible=False)
            with gr.Column():
                d2 = gr.DownloadButton("Download speech file", visible=False)
    
        
        
    with gr.Tab('Advanced Options'):
        gr.Markdown(
            """
            ### Speaker Separation
            Number of speakers
            """)
        with gr.Row():
            n_exact = gr.Number(label='Exact')
            n_min = gr.Number(label='Minimum')
            n_max = gr.Number(label='Maximum')

            n_exact.input(reset_min_max,
                           inputs=[n_exact, n_min, n_max],
                          outputs=[n_exact, n_min, n_max])
            n_min.input(validate_min_max,
                        inputs=[n_exact, n_min, n_max],
                        outputs=[n_exact, n_min, n_max])
            n_max.input(validate_min_max,
                        inputs=[n_exact, n_min, n_max],
                        outputs=[n_exact, n_min, n_max])
        gr.Markdown(
            """
            ### Speech to Text
            """
        )
        with gr.Row():
            avail_lang = [(k,v) for k,v in speech_grid_interface.get_asr_available_lang().items()]
            lang_drop = gr.Dropdown(label='Language', choices=avail_lang, value=speech_grid_interface.get_asr_lang())
            lang_drop.change(speech_grid_interface.set_asr_lang,
                            inputs=lang_drop)

            is_lm_enabled = speech_grid_interface.get_lm_enable()
            lm_enable = gr.Checkbox(label='Enable Language Model', value=is_lm_enabled, interactive=True)
            lm_enable.change(speech_grid_interface.set_lm_enable,
                            inputs=lm_enable)
    
    process.click(speech_grid_interface.process, 
                      inputs=record_audio,
                      outputs=[output_text, d1, d2])
     
gui.queue().launch(share=shareable)

# V1

In [None]:
import os
import logging
import soundfile as sf
import gradio as gr

from speechgrid import SpeechGrid
from config import setup_logger, output_dir
from core.utils import generate_file_basename, load_speech_file, zip_files


setup_logger()
logger = logging.getLogger(__name__)

class SpeechGridInterface(SpeechGrid):
    def __init__(self, config_file):
        super().__init__(config_file=config_file)
        #self.speech_grid = SpeechGrid(config_file=config_file)
        self.tasks = []
        #Options
        self.recorded_speech = False #If user record file on the fly
    
        

    def add_task(self, task):
        if task not in self.tasks:
            self.tasks.append(task)

    def remove_task(self, task):
        if task in self.tasks:
            self.tasks.remove(task)

    def set_tasks(self, tasks):
        self.tasks = tasks
        
    def get_tasks(self):
        return self.tasks


    def process(self, path, mode='single', progress=gr.Progress()): #input either a speech file o
        self.load_tasks(self.get_tasks())

        if mode=='single':
            speech_file, output_zip_file, wav_file_created, out_file_created = self.process_file(path, progress = progress)
            if wav_file_created:
                download_speech_enable = True
                download_speech_value = speech_file
                download_speech_label = f"Download speech file"
            else:
                download_speech_enable = False
                download_speech_value = None
                download_speech_label = "Error in saving speech file"
                
            if out_file_created:
                download_data_enable = True
                download_data_value = output_zip_file
                download_data_label = f"Download output file"
            else:
                download_data_enable = False
                download_data_value = None
                download_data_label = "Error in archiving data"

            return ["Processing completed..", 
                gr.DownloadButton(label=download_data_label,
                                  value=download_data_value,
                                  interactive=download_data_enable,
                                  visible=True),
                
                gr.DownloadButton(label=download_speech_label,
                                  value=download_speech_value,
                                  interactive=download_speech_enable,
                                  visible=True)]
    
    def apply_tasks_to_speech(self, task_pipeline, speech, basename, sr=16000, progress = gr.Progress()):
        out_textgrid = []

        num_processes = len(task_pipeline)+1
        i = 1
        for task in task_pipeline:
            logger.info(f'Applying {task}...')
            progress(i/(num_processes+1), desc=f"Applying {task}")
            i += 1
            if task == 'ASR':
                asr_engine = self.loaded_tasks['ASR']
                textgrid_file = os.path.join(output_dir,f'{basename}_ASR.TextGrid')
                if not out_textgrid:
                    asr_engine.process_speech(speech)
                else:
                    input_textgrid = out_textgrid[-1]
                    asr_engine.process_intervals(speech, input_textgrid, sr = sr, offset_sec=0, 
                                                 speech_label = self.speech_label)
                
                asr_engine.write_textgrid(textgrid_file)
                out_textgrid.append(textgrid_file)
            
            elif task == 'VAD':
                vad_engine = self.loaded_tasks['VAD']
                rttm_file = os.path.join(output_dir,f'{basename}_VAD.rttm')
                textgrid_file = os.path.join(output_dir,f'{basename}_VAD.TextGrid')
                vad_engine.process_speech(speech,sr)
                vad_engine.write_rttm(rttm_file)
                vad_engine.write_textgrid(textgrid_file, speech_label=self.speech_label)
                out_textgrid.append(textgrid_file)
            
            elif task == 'SD':
                sd_engine = self.loaded_tasks['SD']
                rttm_file = os.path.join(output_dir,f'{basename}_SD.rttm')
                textgrid_file = os.path.join(output_dir,f'{basename}_SD.TextGrid')
                n_exact_speakers, n_min_speakers, n_max_speakers = self.get_speaker_numbers()
                sd_engine.process_speech(speech=speech,
                                         sr=sr,
                                         n_exact_speakers = n_exact_speakers,
                                         n_min_speakers = n_min_speakers,
                                         n_max_speakers = n_max_speakers)
                sd_engine.write_rttm(rttm_file)
                sd_engine.write_textgrid(textgrid_file, speech_label=self.speech_label)
                out_textgrid.append(textgrid_file)
        
        return out_textgrid
    
    def process_file(self, speech_file, progress=gr.Progress()):
        if self.recorded_speech:
            basename = generate_file_basename() #Generate random name
        else:
            basename = os.path.splitext(os.path.basename(speech_file))[0]

        progress(0, desc=f"Loading Speech File...")
        
        logger.info('Loading Speech File...')
    
        try:
            speech, sr, duration = load_speech_file(speech_file)
        except Exception as e:
            logger.exception(f'Failed to load the speech file {speech_file}, {e}')
            raise
        
        tasks = set(self.get_tasks())
        
        task_pipeline = self.create_task_pipeline(tasks, duration)
    
        logger.info(f'Start processing, following tasks will be performed on {basename} {','.join(task_pipeline)}')
    
        #Loading task engines
        self.load_tasks(task_pipeline)
        
        out_textgrid = self.apply_tasks_to_speech(task_pipeline, speech, basename, sr, progress)
        
        output_zip_file = os.path.join(output_dir,f'{basename}_output.zip')
        
        progress(1, desc=f"Generate output files")
    
    
        logger.info(f'Saving output files in {output_dir}, {basename}')
    
        #This save a version of the speech file with 16k, mono, 16bit
        wav_file_created = True
        out_file_created = True
        try:
            speech_file = os.path.join(output_dir,f'{basename}.wav')
            sf.write(speech_file, speech, sr)

        except Exception as e:
            logger.exception(f'Failed to save the speech file {speech_file}, {e}')
            wav_file_created = False
        
        try:
            p = zip_files(out_textgrid, output_zip_file)
       
        except Exception as e:
            logger.exception(f'Failed to create output archive in {output_zip_file}, {e}')
            out_file_created = False
      
         
        progress(1, desc=f"Processing {basename} completed..")
    
        logger.info(f'Processing {basename} is completed')

        return (speech_file, output_zip_file, wav_file_created, out_file_created)
        
        

In [None]:
speech_grid_interface = SpeechGridInterface(config_file='configh.yaml')

In [None]:
os.path.splitext(os.path.basename('/en/stable/user_install.html'))[0]

In [None]:
shareable = False
from core.utils import generate_file_basename, load_speech_file, zip_files
import soundfile as sf
#TODO: Progress bar
#TODO: VAD ########DONE
#TODO: Number of speakers  ######DONE
#TODO: Word alignment
#TODO: Download button  ######DONE
#TODO: Wrape in a docker #####DONE
#TODO: Test pyannote offline  ########DONE
#TODO: Logging of errors and info    #########IN PROGRESS
#TODO: ASR with LM  ########DONE
#TODO: Process batch
#TODO: Kaldi ASR
#TODO: MMS ASR   ######DONE
#TODO: Add parameters selection for ASR, SD, VAD
#TODO: Rewrite the textgrid
#TODO: TextGrid code to get the logger and use logging instead of print.
#TODO: Add logging to other packages
#TODO: Use .bin instead of ARPA in LM
#TODO: ASR add the expected words
#TODO: Consider control the offset in interval ASR
#TODO: Add parameters of min silence duration #NEED TO WELL UNDERSTAND THESE PARAMETERS
#TODO: Create set, get for speaker number
#TODO: Name of file as the uploaded file
#TODO: Make process enable after loading or recording ####DONE####
#TODO: Use import tempfile to access the temp dir if need to do so
#TODO: Review the use of get and set, use @property instead or direct access

def reset_min_max(exact,min_v,max_v):
    if exact > 0:
        min_v = 0
        max_v = 0
    speech_grid_interface.set_speaker_numbers(exact, min_v, max_v)
    return exact, min_v, max_v

def validate_min_max(exact, min_v, max_v):
    if min_v > 0 or max_v > 0:
        exact = 0
        if min_v > max_v:
            max_v = min_v
    speech_grid_interface.set_speaker_numbers(exact, min_v, max_v)
    return exact, min_v, max_v

def audio_stat(audio_value):
    if audio_value:
        print(audio_value)
        return gr.Button("Process Audio", interactive=True)
    else:
        return gr.Button("Process Audio", interactive=False)

def audio_record():
    speech_grid_interface.recorded_speech = True

def audio_upload():
    speech_grid_interface.recorded_speech = False

with gr.Blocks(title="SpeechGrid", theme=gr.themes.Soft()) as gui:

    with gr.Tab('Main'):

        record_audio = gr.Audio(sources=["microphone","upload"], type="filepath")
        
        
    
        tasks = gr.CheckboxGroup(choices=[("Speech to Text","ASR"),
                                          ("Speaker Separation","SD"),
                                          ("Speech Detection","VAD")],
                                 value=speech_grid_interface.get_tasks(),
                                 label="Tasks",
                                 info="Apply the following tasks:")
        tasks.input(speech_grid_interface.set_tasks, inputs=tasks)
            
    
        process = gr.Button("Process Audio", interactive=False)

        record_audio.input(audio_stat, inputs=record_audio, outputs=process)
        record_audio.stop_recording(audio_record)
        record_audio.upload(audio_upload)
    
        output_text = gr.Textbox(label='Progress', interactive=False)
        with gr.Row():
            with gr.Column():
                d1 = gr.DownloadButton("Download output", visible=False)
            with gr.Column():
                d2 = gr.DownloadButton("Download speech file", visible=False)
    
        
        
    with gr.Tab('Advanced Options'):
        gr.Markdown(
            """
            ### Speaker Separation
            Number of speakers
            """)
        with gr.Row():
            n_exact = gr.Number(label='Exact')
            n_min = gr.Number(label='Minimum')
            n_max = gr.Number(label='Maximum')

            n_exact.input(reset_min_max,
                           inputs=[n_exact, n_min, n_max],
                          outputs=[n_exact, n_min, n_max])
            n_min.input(validate_min_max,
                        inputs=[n_exact, n_min, n_max],
                        outputs=[n_exact, n_min, n_max])
            n_max.input(validate_min_max,
                        inputs=[n_exact, n_min, n_max],
                        outputs=[n_exact, n_min, n_max])
        gr.Markdown(
            """
            ### Speech to Text
            """
        )
        with gr.Row():
            avail_lang = [(k,v) for k,v in speech_grid_interface.get_asr_available_lang().items()]
            lang_drop = gr.Dropdown(label='Language', choices=avail_lang, value=speech_grid_interface.get_asr_lang())
            lang_drop.change(speech_grid_interface.set_asr_lang,
                            inputs=lang_drop)

            is_lm_enabled = speech_grid_interface.get_lm_enable()
            lm_enable = gr.Checkbox(label='Enable Language Model', value=is_lm_enabled, interactive=True)
            lm_enable.change(speech_grid_interface.set_lm_enable,
                            inputs=lm_enable)
    
    process.click(speech_grid_interface.process, 
                      inputs=record_audio,
                      outputs=[output_text, d1, d2])
     
gui.queue().launch(share=shareable)

# V0

In [None]:
import os
import logging
import soundfile as sf
import gradio as gr

from speechgrid import SpeechGrid
from config import setup_logger, output_dir
from core.utils import generate_file_basename, load_speech_file, zip_files


setup_logger()
logger = logging.getLogger(__name__)

class SpeechGridInterface:
    def __init__(self, config_file):
        self.speech_grid = SpeechGrid(config_file=config_file)
        self.tasks = []
        #Options
        

    def add_task(self, task):
        if task not in self.tasks:
            self.tasks.append(task)

    def remove_task(self, task):
        if task in self.tasks:
            self.tasks.remove(task)

    def set_tasks(self, tasks):
        self.tasks = tasks
        
    def get_tasks(self):
        return self.tasks


    def process(self, path, mode='single', progress=gr.Progress()): #input either a speech file o
        self.speech_grid.load_tasks(self.get_tasks())

        if mode=='single':
            speech_file, output_zip_file, wav_file_created, out_file_created = self.process_file(path, progress = progress)
            if wav_file_created:
                download_speech_enable = True
                download_speech_value = speech_file
                download_speech_label = f"Download speech file"
            else:
                download_speech_enable = False
                download_speech_value = None
                download_speech_label = "Error in saving speech file"
                
            if out_file_created:
                download_data_enable = True
                download_data_value = output_zip_file
                download_data_label = f"Download output file"
            else:
                download_data_enable = False
                download_data_value = None
                download_data_label = "Error in archiving data"

            return ["Processing completed..", 
                gr.DownloadButton(label=download_data_label,
                                  value=download_data_value,
                                  interactive=download_data_enable,
                                  visible=True),
                
                gr.DownloadButton(label=download_speech_label,
                                  value=download_speech_value,
                                  interactive=download_speech_enable,
                                  visible=True)]
    
    def apply_tasks_to_speech(self, task_pipeline, speech, basename, sr=16000, progress = gr.Progress()):
        out_textgrid = []

        num_processes = len(task_pipeline)+1
        i = 1
        for task in task_pipeline:
            logger.info(f'Applying {task}...')
            progress(i/(num_processes+1), desc=f"Applying {task}")
            i += 1
            if task == 'ASR':
                asr_engine = self.speech_grid.loaded_tasks['ASR']
                textgrid_file = os.path.join(output_dir,f'{basename}_ASR.TextGrid')
                if not out_textgrid:
                    asr_engine.process_speech(speech)
                else:
                    input_textgrid = out_textgrid[-1]
                    asr_engine.process_intervals(speech, input_textgrid, sr = sr, offset_sec=0, 
                                                 speech_label = self.speech_grid.speech_label)
                
                asr_engine.write_textgrid(textgrid_file)
                out_textgrid.append(textgrid_file)
            
            elif task == 'VAD':
                vad_engine = self.speech_grid.loaded_tasks['VAD']
                rttm_file = os.path.join(output_dir,f'{basename}_VAD.rttm')
                textgrid_file = os.path.join(output_dir,f'{basename}_VAD.TextGrid')
                vad_engine.process_speech(speech,sr)
                vad_engine.write_rttm(rttm_file)
                vad_engine.write_textgrid(textgrid_file, speech_label=self.speech_grid.speech_label)
                out_textgrid.append(textgrid_file)
            
            elif task == 'SD':
                sd_engine = self.speech_grid.loaded_tasks['SD']
                rttm_file = os.path.join(output_dir,f'{basename}_SD.rttm')
                textgrid_file = os.path.join(output_dir,f'{basename}_SD.TextGrid')
                n_exact_speakers, n_min_speakers, n_max_speakers = self.speech_grid.get_speaker_numbers()
                sd_engine.process_speech(speech=speech,
                                         sr=sr,
                                         n_exact_speakers = n_exact_speakers,
                                         n_min_speakers = n_min_speakers,
                                         n_max_speakers = n_max_speakers)
                sd_engine.write_rttm(rttm_file)
                sd_engine.write_textgrid(textgrid_file, speech_label=self.speech_grid.speech_label)
                out_textgrid.append(textgrid_file)
        
        return out_textgrid
    
    def process_file(self, speech_file, progress=gr.Progress()):
        basename = generate_file_basename()

        progress(0, desc=f"Loading Speech File...")
        
        logger.info('Loading Speech File...')
    
        try:
            speech, sr, duration = load_speech_file(speech_file)
        except Exception as e:
            logger.exception(f'Failed to load the speech file {speech_file}, {e}')
            raise
        
        tasks = set(self.get_tasks())
        
        task_pipeline = self.speech_grid.create_task_pipeline(tasks, duration)
    
        logger.info(f'Start processing, following tasks will be performed on {basename}', ','.join(task_pipeline))
    
        #Loading task engines
        self.speech_grid.load_tasks(task_pipeline)
        
        out_textgrid = self.apply_tasks_to_speech(task_pipeline, speech, basename, sr, progress)
        
        output_zip_file = os.path.join(output_dir,f'{basename}_output.zip')
        
        progress(1, desc=f"Generate output files")
    
    
        logger.info(f'Saving output files in {output_dir}, {basename}')
    
        #This save a version of the speech file with 16k, mono, 16bit
        wav_file_created = True
        out_file_created = True
        try:
            speech_file = os.path.join(output_dir,f'{basename}.wav')
            sf.write(speech_file, speech, sr)

        except Exception as e:
            logger.exception(f'Failed to save the speech file {speech_file}, {e}')
            wav_file_created = False
        
        try:
            p = zip_files(out_textgrid, output_zip_file)
       
        except Exception as e:
            logger.exception(f'Failed to create output archive in {output_zip_file}, {e}')
            out_file_created = False
      
         
        progress(1, desc=f"Processing {basename} completed..")
    
        logger.info(f'Processing {basename} is completed')

        return (speech_file, output_zip_file, wav_file_created, out_file_created)
        
        

In [None]:
import os
import logging
import soundfile as sf
import gradio as gr

from speechgrid import SpeechGrid
from config import setup_logger, output_dir
from core.utils import generate_file_basename, load_speech_file, zip_files


setup_logger()
logger = logging.getLogger(__name__)

class SpeechGridInterface(SpeechGrid):
    def __init__(self, config_file):
        super().__init__(config_file=config_file)
        #self.speech_grid = SpeechGrid(config_file=config_file)
        self.tasks = []
        #Options
        

    def add_task(self, task):
        if task not in self.tasks:
            self.tasks.append(task)

    def remove_task(self, task):
        if task in self.tasks:
            self.tasks.remove(task)

    def set_tasks(self, tasks):
        self.tasks = tasks
        
    def get_tasks(self):
        return self.tasks


    def process(self, path, mode='single', progress=gr.Progress()): #input either a speech file o
        self.load_tasks(self.get_tasks())

        if mode=='single':
            speech_file, output_zip_file, wav_file_created, out_file_created = self.process_file(path, progress = progress)
            if wav_file_created:
                download_speech_enable = True
                download_speech_value = speech_file
                download_speech_label = f"Download speech file"
            else:
                download_speech_enable = False
                download_speech_value = None
                download_speech_label = "Error in saving speech file"
                
            if out_file_created:
                download_data_enable = True
                download_data_value = output_zip_file
                download_data_label = f"Download output file"
            else:
                download_data_enable = False
                download_data_value = None
                download_data_label = "Error in archiving data"

            return ["Processing completed..", 
                gr.DownloadButton(label=download_data_label,
                                  value=download_data_value,
                                  interactive=download_data_enable,
                                  visible=True),
                
                gr.DownloadButton(label=download_speech_label,
                                  value=download_speech_value,
                                  interactive=download_speech_enable,
                                  visible=True)]
    
    def apply_tasks_to_speech(self, task_pipeline, speech, basename, sr=16000, progress = gr.Progress()):
        out_textgrid = []

        num_processes = len(task_pipeline)+1
        i = 1
        for task in task_pipeline:
            logger.info(f'Applying {task}...')
            progress(i/(num_processes+1), desc=f"Applying {task}")
            i += 1
            if task == 'ASR':
                asr_engine = self.loaded_tasks['ASR']
                textgrid_file = os.path.join(output_dir,f'{basename}_ASR.TextGrid')
                if not out_textgrid:
                    asr_engine.process_speech(speech)
                else:
                    input_textgrid = out_textgrid[-1]
                    asr_engine.process_intervals(speech, input_textgrid, sr = sr, offset_sec=0, 
                                                 speech_label = self.speech_label)
                
                asr_engine.write_textgrid(textgrid_file)
                out_textgrid.append(textgrid_file)
            
            elif task == 'VAD':
                vad_engine = self.loaded_tasks['VAD']
                rttm_file = os.path.join(output_dir,f'{basename}_VAD.rttm')
                textgrid_file = os.path.join(output_dir,f'{basename}_VAD.TextGrid')
                vad_engine.process_speech(speech,sr)
                vad_engine.write_rttm(rttm_file)
                vad_engine.write_textgrid(textgrid_file, speech_label=self.speech_label)
                out_textgrid.append(textgrid_file)
            
            elif task == 'SD':
                sd_engine = self.loaded_tasks['SD']
                rttm_file = os.path.join(output_dir,f'{basename}_SD.rttm')
                textgrid_file = os.path.join(output_dir,f'{basename}_SD.TextGrid')
                n_exact_speakers, n_min_speakers, n_max_speakers = self.get_speaker_numbers()
                sd_engine.process_speech(speech=speech,
                                         sr=sr,
                                         n_exact_speakers = n_exact_speakers,
                                         n_min_speakers = n_min_speakers,
                                         n_max_speakers = n_max_speakers)
                sd_engine.write_rttm(rttm_file)
                sd_engine.write_textgrid(textgrid_file, speech_label=self.speech_label)
                out_textgrid.append(textgrid_file)
        
        return out_textgrid
    
    def process_file(self, speech_file, progress=gr.Progress()):
        basename = generate_file_basename()

        progress(0, desc=f"Loading Speech File...")
        
        logger.info('Loading Speech File...')
    
        try:
            speech, sr, duration = load_speech_file(speech_file)
        except Exception as e:
            logger.exception(f'Failed to load the speech file {speech_file}, {e}')
            raise
        
        tasks = set(self.get_tasks())
        
        task_pipeline = self.create_task_pipeline(tasks, duration)
    
        logger.info(f'Start processing, following tasks will be performed on {basename}', ','.join(task_pipeline))
    
        #Loading task engines
        self.load_tasks(task_pipeline)
        
        out_textgrid = self.apply_tasks_to_speech(task_pipeline, speech, basename, sr, progress)
        
        output_zip_file = os.path.join(output_dir,f'{basename}_output.zip')
        
        progress(1, desc=f"Generate output files")
    
    
        logger.info(f'Saving output files in {output_dir}, {basename}')
    
        #This save a version of the speech file with 16k, mono, 16bit
        wav_file_created = True
        out_file_created = True
        try:
            speech_file = os.path.join(output_dir,f'{basename}.wav')
            sf.write(speech_file, speech, sr)

        except Exception as e:
            logger.exception(f'Failed to save the speech file {speech_file}, {e}')
            wav_file_created = False
        
        try:
            p = zip_files(out_textgrid, output_zip_file)
       
        except Exception as e:
            logger.exception(f'Failed to create output archive in {output_zip_file}, {e}')
            out_file_created = False
      
         
        progress(1, desc=f"Processing {basename} completed..")
    
        logger.info(f'Processing {basename} is completed')

        return (speech_file, output_zip_file, wav_file_created, out_file_created)
        
        

In [None]:
import yaml
with open('config.yaml', 'r') as f:
    data = yaml.safe_load(f)

In [None]:
[(k,v) for k,v in data['speechgrid']['speech_recognition']['avail_lang'].items()]

In [None]:
type(data['speechgrid']['speech_recognition']['lang_model'])

In [None]:
speech_grid = SpeechGrid(config_file='config.yaml')

In [None]:
speech_grid.load_asr()
speech_grid.load_sd()
speech_grid.load_vad()

In [None]:
import os
import logging
import soundfile as sf
import gradio as gr

from speechgrid import SpeechGrid
from config import setup_logger, output_dir
from core.utils import generate_file_basename, load_speech_file, zip_files


In [None]:
speech_grid = SpeechGrid(config_file='config.yaml')

In [None]:
import gradio as gr
from config import setup_logger
import logging
import os
from config import output_dir

setup_logger()
logger = logging.getLogger(__name__)
def process_file(speech_file, tasks=['SD', 'ASR'],
                 n_exact_speakers = 0,
                 n_min_speakers = 0,
                 n_max_speakers = 0,
                 progress=gr.Progress()):
    
    basename = generate_file_basename()
    

    progress(0, desc=f"Loading Speech File...")
    
    logger.info('Loading Speech File...')

    try:
        speech, sr, duration = load_speech_file(speech_file)
    except Exception as e:
        logger.exception(f'Failed to load the speech file {speech_file}, {e}')
        raise
    
    tasks = set(tasks)

    
    
    task_pipeline = speech_grid.create_task_pipeline(tasks, duration)

    logger.info('Start processing, following tasks will be performed', ','.join(task_pipeline))

    #Loading task engines
    speech_grid.load_tasks(task_pipeline)
    
    out_textgrid = []


    num_processes = len(task_pipeline)+1
    
    
    i = 1
    for task in task_pipeline:
        logger.info(f'Applying {task}')
        progress(i/(num_processes+1), desc=f"Applying {task}")
        i += 1
        if task == 'ASR':
            asr_engine = speech_grid.loaded_tasks['ASR']
            textgrid_file = os.path.join(output_dir,f'{basename}_ASR.TextGrid')
            if not out_textgrid:
                asr_engine.process_speech(speech)
            else:
                input_textgrid = out_textgrid[-1]
                asr_engine.process_intervals(speech, input_textgrid, sr = sr, offset_sec=0, speech_label = speech_grid.speech_label)
            
            asr_engine.write_textgrid(textgrid_file)
            out_textgrid.append(textgrid_file)
        
        elif task == 'VAD':
            vad_engine = speech_grid.loaded_tasks['VAD']
            rttm_file = os.path.join(output_dir,f'{basename}_VAD.rttm')
            textgrid_file = os.path.join(output_dir,f'{basename}_VAD.TextGrid')
            vad_engine.process_speech(speech,sr)
            vad_engine.write_rttm(rttm_file)
            vad_engine.write_textgrid(textgrid_file, speech_label=speech_grid.speech_label)
            out_textgrid.append(textgrid_file)
        
        elif task == 'SD':
            sd_engine = speech_grid.loaded_tasks['SD']
            rttm_file = os.path.join(output_dir,f'{basename}_SD.rttm')
            textgrid_file = os.path.join(output_dir,f'{basename}_SD.TextGrid')
            
            sd_engine.process_speech(speech=speech,
                                     sr=sr,
                                     n_exact_speakers = n_exact_speakers,
                                     n_min_speakers = n_min_speakers,
                                     n_max_speakers = n_max_speakers)
            sd_engine.write_rttm(rttm_file)
            sd_engine.write_textgrid(textgrid_file, speech_label=speech_grid.speech_label)
            out_textgrid.append(textgrid_file)
    
    
    output_zip_file = os.path.join(output_dir,f'{basename}_output.zip')
    
    progress(num_processes/(num_processes+1), desc=f"Generate output files")


    logger.info(f'Saving output files in {output_dir}, {basename}')

    #This save a version of the speech file with 16k, mono, 16bit
    try:
        speech_file = os.path.join(output_dir,f'{basename}.wav')
        sf.write(speech_file, speech, sr)
        download_speech_enable = True
        download_speech_value = speech_file
        download_speech_label = f"Download speech file"
    except Exception as e:
        logger.exception(f'Failed to save the speech file {speech_file}, {e}')
        download_speech_enable = False
        download_speech_value = None
        download_speech_label = "Error in saving speech file"
    
    try:
        p = zip_files(out_textgrid, output_zip_file)
        download_data_enable = True
        download_data_value = output_zip_file
        download_data_label = f"Download output file"
   
    except Exception as e:
        logger.exception(f'Failed to create output archive in {output_zip_file}, {e}')
        #Disable Download Data Button
        download_data_enable = False
        download_data_value = None
        download_data_label = "Error in archiving data"
        
        
     
    progress(1, desc="Processing completed..")

    logger.info('Processing is completed')
    
    return ["Processing completed..", 
            gr.DownloadButton(label=download_data_label,
                              value=download_data_value,
                              interactive=download_data_enable,
                              visible=True),
            
            gr.DownloadButton(label=download_speech_label,
                              value=download_speech_value,
                              interactive=download_speech_enable,
                              visible=True)]

In [None]:
speech_grid_interface = SpeechGridInterface(config_file='config.yaml')

In [None]:
speech_grid_interface.get_tasks()

In [None]:
shareable = False
from core.utils import generate_file_basename, load_speech_file, zip_files
import soundfile as sf
#TODO: Progress bar
#TODO: VAD ########DONE
#TODO: Number of speakers
#TODO: Word alignment
#TODO: Download button  ######DONE
#TODO: Wrape in a docker #####DONE
#TODO: Test pyannote offline  ########DONE
#TODO: Logging of errors and info    #########IN PROGRESS
#TODO: ASR with LM  ########DONE
#TODO: Process batch
#TODO: Kaldi ASR
#TODO: MMS ASR
#TODO: Add parameters selection for ASR, SD, VAD
#TODO: Rewrite the textgrid
#TODO: TextGrid code to get the logger and use logging instead of print.
#TODO: Add logging to other packages
#TODO: Use .bin instead of ARPA in LM
#TODO: ASR add the expected words
#TODO: Consider control the offset in interval ASR
#TODO: Add parameters of min silence duration #NEED TO WELL UNDERSTAND THESE PARAMETERS
#TODO: Create set, get for speaker number
#TODO: Name of file as the uploaded file
#TODO: Make process enable after loading or recording

def reset_min_max(exact,min_v,max_v):
    if exact > 0:
        min_v = 0
        max_v = 0
    speech_grid_interface.set_speaker_numbers(exact, min_v, max_v)
    return exact, min_v, max_v

def validate_min_max(exact, min_v, max_v):
    if min_v > 0 or max_v > 0:
        exact = 0
        if min_v > max_v:
            max_v = min_v
    speech_grid_interface.set_speaker_numbers(exact, min_v, max_v)
    return exact, min_v, max_v


with gr.Blocks(title="SpeechGrid", theme=gr.themes.Soft()) as gui:

    with gr.Tab('Main'):

        record_audio = gr.Audio(sources=["microphone","upload"], type="filepath")
    
        tasks = gr.CheckboxGroup(choices=[("Speech to Text","ASR"),
                                          ("Speaker Separation","SD"),
                                          ("Speech Detection","VAD")],
                                 value=speech_grid_interface.get_tasks(),
                                 label="Tasks",
                                 info="Apply the following tasks:")
        tasks.input(speech_grid_interface.set_tasks, inputs=tasks)
            
    
        process = gr.Button("Process Audio")
    
        output_text = gr.Textbox(label='Progress', interactive=False)
        with gr.Row():
            with gr.Column():
                d1 = gr.DownloadButton("Download output", visible=False)
            with gr.Column():
                d2 = gr.DownloadButton("Download speech file", visible=False)
    
        
        
    with gr.Tab('Advanced Options'):
        gr.Markdown(
            """
            ### Speaker Separation
            Number of speakers
            """)
        with gr.Row():
            n_exact = gr.Number(label='Exact')
            n_min = gr.Number(label='Minimum')
            n_max = gr.Number(label='Maximum')

            n_exact.input(reset_min_max,
                           inputs=[n_exact, n_min, n_max],
                          outputs=[n_exact, n_min, n_max])
            n_min.input(validate_min_max,
                        inputs=[n_exact, n_min, n_max],
                        outputs=[n_exact, n_min, n_max])
            n_max.input(validate_min_max,
                        inputs=[n_exact, n_min, n_max],
                        outputs=[n_exact, n_min, n_max])
        gr.Markdown(
            """
            ### Speech to Text
            """
        )
        with gr.Row():
            avail_lang = [(k,v) for k,v in speech_grid_interface.get_asr_available_lang().items()]
            lang_drop = gr.Dropdown(label='Language', choices=avail_lang, value=speech_grid_interface.get_asr_lang())
            lang_drop.change(speech_grid_interface.set_asr_lang,
                            inputs=lang_drop)

            is_lm_enabled = speech_grid_interface.get_lm_enable()
            lm_enable = gr.Checkbox(label='Enable Language Model', value=is_lm_enabled, interactive=True)
            lm_enable.change(speech_grid_interface.set_lm_enable,
                            inputs=lm_enable)
    
    process.click(speech_grid_interface.process, 
                      inputs=record_audio,
                      outputs=[output_text, d1, d2])
     
gui.launch(share=shareable)

In [None]:
speech_grid_interface.speech_grid.config

In [None]:
speech_grid.get_lm_enable()

In [None]:
def load_asr(params=None):
    #TODO add to the asr class to read parameters and load the correct model? or a separate function

    try:
        asr_engine = wav2vec_asr.speech_recognition(ASR_MODEL, device= device, lm_model_path=LM_PATH) #This from parameters and has default one #If language not determine use language id
    except:
        print(f'Error loading asr model {ASR_MODEL}')
        raise "Error in loading asr model"
    
    return asr_engine



def load_sd(params=None):
    
    try:
        diarizer = pyannote_sd.speaker_diar(device=device)
    except Exception as e:
        print(f'Error loading speaker diarization model')
        raise f"Error in loading speaker diarization model {e}"
    
    return diarizer



vad_params = {
             'min_duration_off': 0.09791355693027545,
             'min_duration_on': 0.05537587440407595
             }

def load_vad(params=None):
    model_path = 'Models/VAD/pytorch_model.bin'
    try:
        vad_pipeline = pyannote_vad.speech_detection(model_path, params)
    except:
        print(f'Error loading voice activity detection model {model_path}')
        raise "Error in loading voice activity detection model"   
    return vad_pipeline

In [None]:
def process_file(speech_file, tasks=['SD', 'ASR'], parameters=None, progress=gr.Progress()):
    basename = generate_file_basename()
    
    speech, sr, duration = load_speech_file(speech_file)
    
    #This save a version of the speech file with 16k, mono, 16bit
    speech_file = join(output_dir,f'{basename}.wav')
    sf.write(speech_file, speech, sr)
    
    tasks = set(tasks)
    
    task_pipeline = []
    if len(tasks) == 1:
        if 'ASR' in tasks and duration > MAX_DUR: #Add VAD task to split the speech file by sil
            task_pipeline = ['VAD', 'ASR']
        else:
            task_pipeline = list(tasks)
    elif set(tasks) == set(['SD', 'ASR']):
        task_pipeline = ['SD', 'ASR']
    elif set(tasks) == set(['VAD', 'ASR']):
        task_pipeline = ['VAD', 'ASR']
    elif set(tasks) == set(['VAD', 'ASR', 'SD']): #If both SD, VAD and ASR then ASR will be applied on SD output
        task_pipeline = ['VAD', 'SD', 'ASR']
    else:
        task_pipeline = tasks #Only 'SD' and 'VAD' each one will be applied separetly

    #print(speech_file, tasks, duration)
    if 'ASR' in task_pipeline:
        asr_engine = load_asr()
    
    if 'SD' in task_pipeline:
        diarizer = load_sd()
        
    if 'VAD' in task_pipeline:
        vad_engine = load_vad(params=vad_params)
    
    out_textgrid = []
    
    i = 0
    for task in task_pipeline:
        progress(i/(len(task_pipeline)+1), desc=f"Applying {task}")
        i = i+1
        if task == 'ASR':
            texgrid_file = join(output_dir,f'{basename}_ASR.TextGrid')
            if not out_textgrid:
                dTiers_asr = asr_engine.process_speech(speech)
            else:
                input_textgrid = out_textgrid[-1]
                dTiers_asr = asr_engine.process_intervals(speech, input_textgrid, sr = sr, offset_sec=0, speech_label = SPEECH_LABEL)
            
            tm.WriteTxtGrdFromDict(texgrid_file,dTiers_asr,0,duration)
            out_textgrid.append(texgrid_file)
        
        elif task == 'VAD':
            rttm_file = join(output_dir,f'{basename}_VAD.rttm')
            texgrid_file = join(output_dir,f'{basename}_VAD.TextGrid')
            vad_engine.DoVAD(speech,sr)
            vad_engine.write_rttm(rttm_file)
            vad_engine.write_textgrid(texgrid_file, speech_label=SPEECH_LABEL)
            out_textgrid.append(texgrid_file)
        
        elif task == 'SD':
            rttm_file = join(output_dir,f'{basename}_SD.rttm')
            texgrid_file = join(output_dir,f'{basename}_SD.TextGrid')
            diarizer.diarize(speech=speech, sr=sr)
            diarizer.write_rttm(rttm_file)
            diarizer.write_textgrid(texgrid_file, speech_label=SPEECH_LABEL)
            out_textgrid.append(texgrid_file)
    
    
    output_zip_file = join(output_dir,f'{basename}_output.zip')
    
    progress(len(task_pipeline)/(len(task_pipeline)+1), desc=f"Create output archive")
    p = zip_files(out_textgrid, output_zip_file)
    
    progress(1, desc=p)
    
    return [p, gr.DownloadButton(label=f"Download output file", value=output_zip_file, visible=True), 
           gr.DownloadButton(label=f"Download speech file", value=speech_file, visible=True)]

In [None]:
#TODO: Progress bar
#TODO: VAD ########DONE
#TODO: Number of speakers
#TODO: Word alignment
#TODO: Download button  ######DONE
#TODO: Wrape in a docker #####DONE
#TODO: Test pyannote offline  ########DONE
#TODO: Logging of errors and info
#TODO: ASR with LM  ########DONE
#TODO: Process batch
#TODO: Kaldi ASR
#TODO: MMS ASR
#TODO: Parameters for ASR (hotwords, LM/noLM)



with gr.Blocks(theme=gr.themes.Soft()) as gui:

    record_audio = gr.Audio(sources=["microphone","upload"], type="filepath")

    tasks = gr.CheckboxGroup(choices=[("Speech to Text","ASR"), ("Speaker Separation","SD"),("Speech Detection","VAD")], label="Tasks", info="Apply the following tasks:")

    process = gr.Button("Process Audio")

    output_text = gr.Textbox(label='Progress', interactive=False)
    with gr.Row():
        with gr.Column():
            d1 = gr.DownloadButton("Download output", visible=False)
        with gr.Column():
            d2 = gr.DownloadButton("Download speech file", visible=False)

    process.click(process_file, inputs=[record_audio, tasks], outputs=[output_text, d1, d2])
    
    
     
gui.launch()

# Draft

In [None]:
import torch

In [None]:
torch.cuda.is_available()

In [None]:
from transformers import Wav2Vec2ForCTC, Wav2Vec2Processor, AutoProcessor

In [None]:
model = Wav2Vec2ForCTC.from_pretrained('Models/ASR/mms-1b-all/')
processor = AutoProcessor.from_pretrained('Models/ASR/mms-1b-all/')

In [None]:
model.load_adapter(target_lang='eng', local_files_only=True)
processor.tokenizer.set_target_lang("eng")

In [None]:
model_id = "facebook/mms-1b-all"

In [None]:
model

In [None]:
model = Wav2Vec2ForCTC.from_pretrained(model_id)

In [None]:
model.save_pretrained('temp_mms')

In [None]:
processor = AutoProcessor.from_pretrained(model_id)

In [None]:
processor.save_pretrained('temp_mms/')

In [None]:
processor.tokenizer.set_target_lang("eng")

In [None]:
model.load_adapter("eng")

In [None]:
model = Wav2Vec2ForCTC.from_pretrained('Models/ASR/wav2vec2-large-xlsr-53-english/')

In [None]:
processor = Wav2Vec2Processor.from_pretrained('Models/ASR/wav2vec2-large-xlsr-53-english/')

In [None]:
from core.utils import load_speech_file
speech, sr, duration = load_speech_file('output/file_7928_20241008_101915.wav')

In [None]:
duration

In [None]:
inputs = processor(speech, sampling_rate=16_000, return_tensors="pt")

In [None]:
import torch
with torch.no_grad():
    outputs = model(**inputs).logits

In [None]:
ids = torch.argmax(outputs, dim=-1)[0]
transcription = processor.decode(ids)

In [None]:
ids

In [None]:
transcription

In [None]:
from IPython.display import Audio

In [None]:
Audio(data=speech, rate=16000)

In [None]:
!pip install pyctcdecode==0.5.0

In [None]:
from pyctcdecode import build_ctcdecoder
import pyctcdecode

In [None]:
pyctcdecode

In [None]:
lm_path = 'Models/ASR/LM/ngram/4gram_small.arpa.gz'
lm_path_unzip = 'Models/ASR/LM/ngram/4gram_small.arpa'

In [None]:
import gzip
import os, shutil
with gzip.open(lm_path, 'rb') as f_zipped:
    with open(lm_path_unzip, 'wb') as f_unzipped:
        shutil.copyfileobj(f_zipped, f_unzipped)

In [None]:
vocab_dict = processor.tokenizer.get_vocab()
sorted_vocab_dict = {k.lower(): v for k, v in sorted(vocab_dict.items(), key=lambda item: item[1])}

In [None]:
import kenlm

In [None]:
from pyctcdecode import build_ctcdecoder

decoder = build_ctcdecoder(
    labels=list(sorted_vocab_dict.keys()),
    kenlm_model_path=lm_path_unzip,
)


In [None]:
text = decoder.decode(outputs)

In [None]:
uppercase_lm_path = '3-gram.pruned.1e-7.arpa'
if not os.path.exists(uppercase_lm_path):
    with gzip.open(lm_gzip_path, 'rb') as f_zipped:
        with open(uppercase_lm_path, 'wb') as f_unzipped:
            shutil.copyfileobj(f_zipped, f_unzipped)
    print('Unzipped the 3-gram language model.')
else:
    print('Unzipped .arpa already exists.')

lm_path = 'lowercase_3-gram.pruned.1e-7.arpa'
if not os.path.exists(lm_path):
    with open(uppercase_lm_path, 'r') as f_upper:
        with open(lm_path, 'w') as f_lower:
            for line in f_upper:
                f_lower.write(line.lower())
print('Converted language model file to lowercase.')

In [None]:
from transformers import Wav2Vec2ForCTC, Wav2Vec2Processor
import torch

In [None]:
model_id = 'Models/ASR/wav2vec2-large-xlsr-53-english/'

In [None]:
processor = Wav2Vec2Processor.from_pretrained(model_id)
model = Wav2Vec2ForCTC.from_pretrained(model_id)

In [None]:
from core.utils import generate_file_basename, load_speech_file, zip_files
from pyctcdecode import build_ctcdecoder

In [None]:
vocab_dict = processor.tokenizer.get_vocab()
sorted_vocab_dict = {k.lower(): v for k, v in sorted(vocab_dict.items(), key=lambda item: item[1])}

In [None]:
lm_path = 'Models/ASR/LM/ngram/4gram_big.arpa.gz'
lm_path_unzip = 'Models/ASR/LM/ngram/4gram_big.arpa'

In [None]:
import gzip
import os, shutil
with gzip.open(lm_path, 'rb') as f_zipped:
    with open(lm_path_unzip, 'wb') as f_unzipped:
        shutil.copyfileobj(f_zipped, f_unzipped)

In [None]:
from pyctcdecode import build_ctcdecoder
#TODO: Create .bin LM

decoder = build_ctcdecoder(
    labels=list(sorted_vocab_dict.keys()),
    kenlm_model_path=lm_path_unzip,  # either .arpa or .bin file
     alpha=0.5,  # tuned on a val set
    beta=1.0,  # tuned on a val set
)


In [None]:
speech, sr, duration = load_speech_file('../tmp/file_6820_20240928_232234_530279.wav')

In [None]:
inputs = processor(speech, sampling_rate=16_000, return_tensors="pt", padding=True)

In [None]:
with torch.no_grad():
    logits = model(inputs.input_values, attention_mask=inputs.attention_mask).logits

In [None]:
predicted_ids = torch.argmax(logits, dim=-1)
predicted_sentences = processor.batch_decode(predicted_ids)

In [None]:
predicted_sentences

In [None]:
x = logits.squeeze()
x = x.cpu().detach().numpy()
hotwords = None #["hello", "second"]
text = decoder.decode(x,
                     hotwords=hotwords,
                     hotword_weight=10.0)

In [None]:
text

In [None]:
text

In [None]:
x = logits.squeeze()
x = x.cpu().detach().numpy()

In [None]:
import numpy as np

In [None]:
np.amax(x, axis=1, keepdims=True)

In [None]:
x.cpu().detach().numpy()

In [None]:
import kenlm

kenlm_model = kenlm.Model(lm_path_unzip)

In [None]:
import pyannote.audio as p

In [None]:
p.__version__

In [None]:
import logging
logging.basicConfig(
    format="{asctime} - {name} - {levelname} - {message}",
    level=logging.DEBUG, style='{'
)

logging.info("This is an informational message")

In [None]:
try:
    u = 8/0;
except:
    logging.exception("Exception...")
    logging.warning("Non critical exception ...", exc_info=True)

In [None]:
def some_function():
    logging.debug("Debug message")

In [None]:
some_function()

In [None]:
import logging

In [None]:
logging.basicConfig()

In [None]:
formatter = logging.Formatter("{asctime} - {levelname}", style='{')

In [None]:
fileHand = logging.FileHandler('test.log', mode='a', encoding='utf-8')

In [None]:
fileHand.setFormatter(formatter)

In [None]:
logger = logging.getLogger('here')

In [None]:
logger.setLevel(logging.DEBUG)

In [None]:
logger.addHandler(fileHand)

In [None]:
logger.info("Lets see!")

In [None]:
logger.getEffectiveLevel()

In [None]:
pwd

In [None]:
i = 0
i +=1

In [None]:
i

In [None]:
logger

In [None]:
import logging

In [None]:
logging.basicConfig(level=logging.DEBUG)

In [None]:
logger = logging.getLogger()

In [None]:
logger.getEffectiveLevel()

In [None]:
logger

In [None]:
logger_a = logging.getLogger('A')

In [None]:
logger_a

In [None]:
logger.setLevel(logging.INFO)

In [None]:
logger

In [None]:
logger_a

In [None]:
logger_a.setLevel(logging.WARN)

In [None]:
logger

In [None]:
import gradio as gr

def reset_min_max(exact,min_v,max_v):
  if exact > 0:
      min_v = 0
      max_v = 0
  return exact, min_v, max_v

def validate(exact, min_v, max_v):
    if min_v > 0 or max_v > 0:
        exact = 0
        if min_v > max_v:
            max_v = min_v
    return exact, min_v, max_v
  
def toggle_it():

with gr.Blocks() as gui:
    x = gr.Button(visible=True)
    a = gr.Number(label='Exact', interactive=True)
    b = gr.Number(label='Min', interactive=True)
    c = gr.Number(label='Max', interactive=True)




    a.change(reset_min_max,inputs=[a,b,c],outputs=[a,b,c])
    #b.change(reset_exact, inputs=[b,a],outputs=a)
    #c.change(reset_exact, inputs=[c,a],outputs=a)
    b.change(validate, inputs=[a,b,c], outputs=[a,b,c])
    c.change(validate, inputs=[a,b,c], outputs=[a,b,c])


gui.launch()

In [None]:
x = {'a':'b','v':'h'}

In [None]:
'b' in x

In [None]:
f = True
T = ['ASR']
L = ['ASR','SD']

if 'ASR' in T and ('ASR' not in L or f):
    print('True')

In [None]:
import gradio as gr


def greet(name='f',a='b'):
    return "Hello " + name + "!"


with gr.Blocks() as demo:
    name = gr.Textbox(label="Name")
    output = gr.Textbox(label="Output Box")
    greet_btn = gr.Button("Greet")
    greet_btn.click(fn=greet, inputs=name, outputs=output, api_name="greet")

if __name__ == "__main__":
    demo.launch()


In [None]:
class EmptyClass:
    pass

In [None]:
c = EmptyClass()
dir(c)

In [None]:
x = 100

def print_x():
    print(x)

In [None]:
print_x()

In [1]:
import gradio as gr

  from .autonotebook import tqdm as notebook_tqdm


In [11]:
x = 5


    
with gr.Blocks() as gui:
    f_s = gr.State(x)
    x_in = gr.Number()
    def change_x(x_in, f):
        x = x_in
        return f
    x_in.input(change_x, inputs=[x_in, f_s], outputs=f_s)

gui.launch()

Running on local URL:  http://127.0.0.1:7863

To create a public link, set `share=True` in `launch()`.




--------


1
2


In [9]:
x

5

In [10]:
f_s.value

5