In [None]:
from denoise import denoise_audio
from diarisation import diarization_wav2vec2
from enregistrement import record_loop
from normalize import normalize_volume
from transcribe import transcribe_w2v2
from vad import VAD

from pathlib import Path
from huggingface_hub import login
from pyannote.audio import Model
from pyannote.audio.pipelines import VoiceActivityDetection
import torchaudio
import sounddevice as sd
import soundfile as sf
import numpy as np
import noisereduce as nr
import torch
from transformers import AutoModelForCTC, Wav2Vec2Processor
import os
import json
from datetime import datetime, timedelta

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model_ctc = AutoModelForCTC.from_pretrained("bhuang/asr-wav2vec2-french").to(device)
processor = Wav2Vec2Processor.from_pretrained("bhuang/asr-wav2vec2-french")
model_sample_rate = processor.feature_extractor.sampling_rate

model_segm = Model.from_pretrained(
"pyannote/segmentation-3.0")

pipeline = VoiceActivityDetection(segmentation=model_segm)

In [33]:
def record_loop(duration, bruit_reduction=True, samplerate=16000):
    """
    Enregistre des segments audio cons√©cutifs et les sauvegarde dans un dossier temporaire.
    L'enregistrement s'arr√™te manuellement avec Ctrl+C ou automatiquement apr√®s `duration` secondes.

    Args:
        duration (float): Dur√©e de chaque segment audio.
        bruit_reduction (bool): Appliquer une r√©duction de bruit.
        samplerate (int): Taux d'√©chantillonnage audio.
    """
    os.makedirs("tests", exist_ok=True)
    log_path = os.path.join("tests", "audio_brut.json")

    # Supprimer le fichier JSON existant
    if os.path.exists(log_path):
        os.remove(log_path)

    logs = []
    k = 1

    print("Parlez (Ctrl+C pour arr√™ter).")

    try:
        with sd.InputStream(samplerate=samplerate, channels=1, dtype='float32') as stream:
            while True:
                start_time = datetime.now()
                safe_time = start_time.strftime("%Y%m%d_%H%M%S")
                filename = f"record_chunk_{k}_{safe_time}.wav"
                filepath = os.path.join("tests", filename)

                frames = []
                while (datetime.now() - start_time).total_seconds() < duration:
                    block = stream.read(1024)[0]
                    frames.append(block)

                recording = np.concatenate(frames, axis=0).squeeze()

                if bruit_reduction:
                    recording = nr.reduce_noise(y=recording, sr=samplerate)

                sf.write(filepath, recording, samplerate)
                end_time = datetime.now()

                entry = {
                    "start_time": start_time.strftime("%Y-%m-%d %H:%M:%S"),
                    "end_time": end_time.strftime("%Y-%m-%d %H:%M:%S"),
                    "filename": filename
                }
                logs.append(entry)

                # Sauvegarde du JSON apr√®s chaque segment
                with open(log_path, "w", encoding="utf-8") as f:
                    json.dump(logs, f, indent=4, ensure_ascii=False)

                k += 1

    except KeyboardInterrupt:
        print("\nArr√™t demand√© par l'utilisateur (Ctrl+C).")

        # Sauvegarde du dernier segment partiel (si existant)
        if 'frames' in locals() and len(frames) > 0:
            recording = np.concatenate(frames, axis=0).squeeze()
            if bruit_reduction:
                recording = nr.reduce_noise(y=recording, sr=samplerate)
            sf.write(filepath, recording, samplerate)
            end_time = datetime.now()

            entry = {
                "start_time": start_time.strftime("%Y-%m-%d %H:%M:%S"),
                "end_time": end_time.strftime("%Y-%m-%d %H:%M:%S"),
                "filename": filename
            }
            logs.append(entry)

            with open(log_path, "w", encoding="utf-8") as f:
                json.dump(logs, f, indent=4, ensure_ascii=False)


In [34]:
record_loop(duration=5, bruit_reduction=True)

Parlez (Ctrl+C pour arr√™ter).

Arr√™t demand√© par l'utilisateur (Ctrl+C).


In [35]:
def VADe(audio_path, min_duration_on=2.0, min_duration_off=2.0):
    

    HYPER_PARAMETERS = {
    # Si un segment de parole d√©tect√© dure moins de 3 secondes, il sera ignor√©.
    "min_duration_on": min_duration_on,
    # Si une pause est plus courte que 10 secondes, elle peut √™tre remplie ou fusionn√©e avec les segments voisins.
    "min_duration_off": min_duration_off
    }
    pipeline.instantiate(HYPER_PARAMETERS)
    vad = pipeline(audio_path)

    log_path = os.path.join("tmp", "transcriptions_log.json")

    # Charger le JSON existant s'il existe
    if os.path.exists(log_path):
        with open(log_path, "r", encoding="utf-8") as f:
            try:
                logs = json.load(f)
            except json.JSONDecodeError:
                logs = []
    else:
        logs = []
        
        
        
    #Time
    filename_brut = audio_path.name

    with open("tests/audio_brut.json", "r") as f:
        data = json.load(f)

    entry = next((item for item in data if item["filename"] == filename_brut ), None)

    if entry:
        i=0
    else:
        print("Aucun enregistrement trouv√© pour ce fichier.")
    
    
    start_time = entry["start_time"]

    
    waveform, sr = torchaudio.load(audio_path)
    i=0
    for segment, _, _ in vad.itertracks(yield_label=True):
        print(segment)
        i = i+1
        
        s = int(segment.start)
        e = int(segment.end)
        
        start = int(segment.start * sr)
        end = int(segment.end * sr)
        
        segment = waveform[:, start:end]
        
        segment_filename = f"{os.path.splitext(os.path.basename(audio_path))[0]}_segment_{i}.wav"
        segment_path = os.path.join("tmp", segment_filename)
        
        torchaudio.save(segment_path, segment, sr)
        
        
        base_start_str = start_time
        base_start = datetime.strptime(base_start_str, "%Y-%m-%d %H:%M:%S")


        absolute_start = base_start + timedelta(seconds=s)
        absolute_end = base_start + timedelta(seconds=e)
        
        
        entry = {
            "start_time": absolute_start.strftime("%Y-%m-%d %H:%M:%S"),
            "end_time": absolute_end.strftime("%Y-%m-%d %H:%M:%S"),
            "filename": segment_filename
        }
        logs.append(entry)
        
    with open(log_path, "w", encoding="utf-8") as f:
        json.dump(logs, f, indent=4, ensure_ascii=False)
        
    # --- üîΩ AJOUT DU TRI PAR DATE APR√àS SAUVEGARDE ---
    with open(log_path, "r", encoding="utf-8") as f:
        logs = json.load(f)

    logs.sort(key=lambda x: datetime.strptime(x["start_time"], "%Y-%m-%d %H:%M:%S"))


    with open(log_path, "w", encoding="utf-8") as f:
        json.dump(logs, f, indent=4, ensure_ascii=False)
    # ------------------------------------------------ #

    
    data = [entry for entry in data if entry.get("filename") != filename_brut]
    with open("tests/audio_brut.json", "w") as f:
        json.dump(data, f, indent=4)
    
    os.remove(audio_path)
    


In [36]:
folder = Path("tests")

for audio_path in folder.glob("*.wav"): 
    VADe(audio_path, min_duration_on=1, min_duration_off=2)     

[ 00:00:00.030 -->  00:00:04.131]


  s = torchaudio.io.StreamWriter(uri, format=muxer, buffer_size=buffer_size)


In [37]:

def transcribe_w2v2(audio_path):
    """
    Transcrit un fichier audio en texte √† l‚Äôaide d‚Äôun mod√®le Whisper.

    Cette fonction charge un mod√®le Whisper pr√©entra√Æn√© de la taille sp√©cifi√©e,
    puis effectue la transcription du fichier audio fourni.  
    Elle peut √©galement utiliser un prompt initial pour guider la transcription

    """
    
    log_path = os.path.join("tmp", "transcriptions_log.json")

    # Charger le JSON existant s'il existe
    if os.path.exists(log_path):
        with open(log_path, "r", encoding="utf-8") as f:
            try:
                logs = json.load(f)
            except json.JSONDecodeError:
                logs = []
    else:
        logs = []
        
        
        
    #Time
    filename_brut = audio_path.name

    entry = next((item for item in logs if item["filename"] == filename_brut ), None)

    if entry and "transcription" in entry:
        # La transcription existe d√©j√†
        return print(f"Transcription d√©j√† pr√©sente pour {filename_brut}")
    
    
    
    wav_path = audio_path
    waveform, sample_rate = torchaudio.load(wav_path)
    waveform = waveform.squeeze(axis=0)  # mono

    # resample
    if sample_rate != model_sample_rate:
        resampler = torchaudio.transforms.Resample(sample_rate, model_sample_rate)
        waveform = resampler(waveform)

    # normalize
    input_dict = processor(waveform, sampling_rate=model_sample_rate, return_tensors="pt")

    with torch.inference_mode():
        logits = model_ctc(input_dict.input_values.to(device)).logits

    # decode
    predicted_ids = torch.argmax(logits, dim=-1)
    predicted_sentence = processor.batch_decode(predicted_ids)[0]

    entry["transcription"] = predicted_sentence

    os.makedirs(os.path.dirname(log_path), exist_ok=True)
    with open(log_path, "w", encoding="utf-8") as f:
        json.dump(logs, f, ensure_ascii=False, indent=4)


In [38]:
folder = Path("tmp")

for audio_path in folder.glob("*.wav"): 
    transcribe_w2v2(audio_path)