In [1]:
import torch
import tempfile
import torchaudio
from transformers import pipeline
from datasets import load_dataset
import os
import gc

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
device = "cuda" if torch.cuda.is_available() else "cpu"

asr_pipeline = pipeline(
    "automatic-speech-recognition",
    model="openai/whisper-large-v3",
    chunk_length_s=20,                 
    return_timestamps=True,
    torch_dtype=torch.float16,         
    device=device
)

Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


In [7]:
def transcribe_audio_file(audio_bytes: bytes, original_filename: str, return_timestamps: bool = False):
    ext = os.path.splitext(original_filename)[1].lower()

    with tempfile.NamedTemporaryFile(suffix=ext, delete=True) as temp_audio:
        temp_audio.write(audio_bytes)
        temp_audio.flush()

        waveform, sample_rate = torchaudio.load(temp_audio.name)

        if sample_rate != 16000:
            waveform = torchaudio.functional.resample(waveform, sample_rate, 16000)

        if waveform.shape[0] > 1:
            waveform = waveform.mean(dim=0, keepdim=True)

        audio_input = waveform.squeeze().numpy()

        result = asr_pipeline(
            audio_input,
            batch_size=8,
            return_timestamps=return_timestamps,
            generate_kwargs={"language": "ru"} 
        )

        return result["chunks"] if return_timestamps else result["text"]

In [9]:
import yt_dlp
def download_audio_from_youtube(url: str) -> tuple[bytes, str]:
    with tempfile.TemporaryDirectory() as tmpdir:
        output_path = os.path.join(tmpdir, "%(title)s.%(ext)s")
        ydl_opts = {
            "format": "bestaudio/best",
            "outtmpl": output_path,
            "noplaylist": True,
            "quiet": True,
            "postprocessors": [{
                "key": "FFmpegExtractAudio",
                "preferredcodec": "mp3",
                "preferredquality": "192",
            }],
        }

        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            info_dict = ydl.extract_info(url, download=True)
            filename = ydl.prepare_filename(info_dict).rsplit(".", 1)[0] + ".mp3"

        with open(filename, "rb") as f:
            audio_bytes = f.read()

        return audio_bytes, os.path.basename(filename)


In [None]:
if __name__ == "__main__":
    # with open(filename, "rb") as f:
    #     audio = f.read()
    youtube_url = input("ссылка на ют ").strip()

    print("\nскачивание...")
    audio, filename = download_audio_from_youtube(youtube_url)
    
    print("\nТОЛЬКО ТЕКСТ-------")
    text = transcribe_audio_file(audio, filename, return_timestamps=False)
    print(text)

    print("\nТЕКСТ С ТАЙМКОДАМИ---------")
    chunks = transcribe_audio_file(audio, filename, return_timestamps=True)
    for chunk in chunks:
        print(chunk)

In [9]:
import os
import yt_dlp
from pathlib import Path

def download_youtube_audio(url: str):
    downloads_path = str(Path.home() / "Downloads")

    ydl_opts = {
        "format": "bestaudio/best",
        "outtmpl": os.path.join(downloads_path, "%(title)s.%(ext)s"),
        "noplaylist": True,
        "quiet": False,
        "postprocessors": [
            {
                "key": "FFmpegExtractAudio",
                "preferredcodec": "mp3",
                "preferredquality": "192",
            }
        ],
    }

    with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        print(f"📥 Скачиваем {url} в MP3...")
        ydl.download([url])
        print(f"✅ Скачано в: {downloads_path}")

if __name__ == "__main__":
    youtube_url = input("ссылка на ютуб: ").strip()
    download_youtube_audio(youtube_url)


📥 Скачиваем https://youtu.be/ozmAG58n12k?si=DR7nurcTa14Zy5WK в MP3...
[youtube] Extracting URL: https://youtu.be/ozmAG58n12k?si=DR7nurcTa14Zy5WK
[youtube] ozmAG58n12k: Downloading webpage
[youtube] ozmAG58n12k: Downloading tv client config
[youtube] ozmAG58n12k: Downloading tv player API JSON
[youtube] ozmAG58n12k: Downloading ios player API JSON
[youtube] ozmAG58n12k: Downloading player 461f4c95-main
[youtube] ozmAG58n12k: Downloading m3u8 information
[info] ozmAG58n12k: Downloading 1 format(s): 251
[download] Destination: /home/deniska/Downloads/Лучший диалог за всю историю кинематографа.webm
[download] 100% of  436.84KiB in 00:00:00 at 731.85KiB/s 
[ExtractAudio] Destination: /home/deniska/Downloads/Лучший диалог за всю историю кинематографа.mp3
Deleting original file /home/deniska/Downloads/Лучший диалог за всю историю кинематографа.webm (pass -k to keep)
✅ Скачано в: /home/deniska/Downloads


```bash
pip install git+https://github.com/huggingface/speechbox
pip install pyannote.audio torch torchaudio
```

```python
from speechbox import ASRDiarizationPipeline
from transformers import pipeline as hf_pipeline
from pyannote.audio import Pipeline as DiarizationPipeline
import tempfile
import torchaudio
import torch
import os
import gc
from config import HF_TOKEN  


asr_pipeline = hf_pipeline(
    "automatic-speech-recognition",
    model="openai/whisper-large-v3",
    return_timestamps=True,
    torch_dtype=torch.float16,
    device="cuda" if torch.cuda.is_available() else "cpu",
)

diarization_pipeline = DiarizationPipeline.from_pretrained(
    "pyannote/speaker-diarization-3.0",
    use_auth_token=HF_TOKEN,
)

combined_pipeline = ASRDiarizationPipeline(
    asr_pipeline=asr_pipeline,
    diarization_pipeline=diarization_pipeline,
)

def transcribe_audio_file(audio_bytes: bytes, original_filename: str):
    ext = os.path.splitext(original_filename)[1].lower()

    with tempfile.NamedTemporaryFile(suffix=ext, delete=True) as temp_audio:
        temp_audio.write(audio_bytes)
        temp_audio.flush()

        waveform, sample_rate = torchaudio.load(temp_audio.name)

        if sample_rate != 16000:
            waveform = torchaudio.functional.resample(waveform, sample_rate, 16000)

        if waveform.shape[0] > 1:
            waveform = waveform.mean(dim=0, keepdim=True)

        input_tensor = waveform.float()  # (1, seq_len)
        if len(input_tensor.shape) == 1:
            input_tensor = input_tensor.unsqueeze(0)

        outputs = combined_pipeline({
            "waveform": input_tensor,
            "sample_rate": sample_rate
        })


        def tuple_to_string(t, ndigits=1):
            return f"({round(t[0], ndigits)}, {round(t[1], ndigits)})"

        def format_as_dialogue(segments):
            speaker_map = {}
            dialogue_lines = []
            speaker_counter = 1

            for seg in segments:
                speaker = seg["speaker"]
                if speaker not in speaker_map:
                    speaker_map[speaker] = f"Спикер {speaker_counter}"
                    speaker_counter += 1
                label = speaker_map[speaker]
                dialogue_lines.append(f"{label}: {seg['text'].strip()}")

            return "\n".join(dialogue_lines)

        dialogue_text = format_as_dialogue(outputs)

        full_text = " ".join([s["text"].strip() for s in outputs])
        speaker_chunks = outputs  

        return {
            "full_text": full_text,
            "speaker_chunks": speaker_chunks,
            "dialogue_text": dialogue_text
        }
```