In [4]:
!pip install yt-dlp pydub openai-whisper

Collecting yt-dlp
  Downloading yt_dlp-2024.12.6-py3-none-any.whl.metadata (172 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/172.1 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m172.1/172.1 kB[0m [31m6.6 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting pydub
  Downloading pydub-0.25.1-py2.py3-none-any.whl.metadata (1.4 kB)
Collecting openai-whisper
  Downloading openai-whisper-20240930.tar.gz (800 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m800.5/800.5 kB[0m [31m29.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Collecting tiktoken (from openai-whisper)
  Downloading tiktoken-0.8.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.6 kB)
Collecting triton>=2.0.0 (from openai-whisper)
  Downloading t

In [5]:
import yt_dlp
from pydub import AudioSegment
import whisper
import os
import json
import time

from google.colab import drive
drive.flush_and_unmount()
drive.mount('/content/drive')

Drive not mounted, so nothing to flush and unmount.
Mounted at /content/drive


In [6]:
import os
print(os.path.exists("/content/drive/MyDrive"))
print(os.access("/content/drive/MyDrive", os.W_OK))


True
True


In [7]:

# Load the Whisper model
model = whisper.load_model("turbo")


100%|█████████████████████████████████████| 1.51G/1.51G [00:16<00:00, 98.0MiB/s]
  checkpoint = torch.load(fp, map_location=device)


In [8]:

# Define Google Drive paths
drive_base_path = "/content/drive/MyDrive/SpeechDownloads"
downloads_path = os.path.join(drive_base_path, "downloads")
transcriptions_path = os.path.join(drive_base_path, "transcriptions")
split_path = os.path.join(drive_base_path, "split")
download_log = os.path.join(drive_base_path, "download_log.json")

# Ensure output directories exist
os.makedirs(downloads_path, exist_ok=True)
os.makedirs(transcriptions_path, exist_ok=True)
os.makedirs(split_path, exist_ok=True)

In [9]:
def load_download_log():
    """Load the list of already downloaded files."""
    if os.path.exists(download_log):
        with open(download_log, "r") as f:
            return set(json.load(f))
    return set()

def save_download_log(downloaded_files):
    """Save the list of downloaded files."""
    with open(download_log, "w") as f:
        json.dump(list(downloaded_files), f)

def download_audio(query, num_videos=500, batch_size=100, retries=3):
    """Downloads audio from YouTube in batches with retry logic, filtering by video length."""
    downloaded_files = load_download_log()

    for start in range(0, num_videos, batch_size):
        batch_query = f"{query} {start}..{start + batch_size - 1}"
        print(f"Downloading batch {start + 1} to {start + batch_size}...")

        ydl_opts = {
            'format': 'bestaudio/best',
            'postprocessors': [{'key': 'FFmpegExtractAudio', 'preferredcodec': 'mp3'}],
            'default_search': f'ytsearch{batch_size}',
            'postprocessor_args': [],
            'noplaylist': False,
            'outtmpl': os.path.join(downloads_path, '%(title)s.%(ext)s')
        }

        attempt = 0
        while attempt < retries:
            try:
                with yt_dlp.YoutubeDL(ydl_opts) as ydl:
                    entries = ydl.extract_info(batch_query, download=False)['entries']

                    for query_result in entries:
                        # Get video duration in seconds
                        duration = query_result.get('duration', 0)

                        # Skip videos that are less than 5 minutes or longer than 30 minutes
                        if duration < 300 or duration > 1800:
                            print(f"Skipping video: {query_result['title']} (Duration: {duration}s)")
                            continue

                        file_name = f"{query_result['title']}.mp3"
                        if file_name in downloaded_files:
                            print(f"Skipping already downloaded file: {file_name}")
                            continue

                        ydl.download([query_result['webpage_url']])
                        downloaded_files.add(file_name)
                        save_download_log(downloaded_files)

                # Break out of the retry loop if successful
                break

            except yt_dlp.utils.DownloadError as e:
                attempt += 1
                print(f"DownloadError occurred. Retrying... (Attempt {attempt}/{retries})")
                time.sleep(5)  # Wait before retrying
                if attempt == retries:
                    print(f"Failed to download batch {start + 1} to {start + batch_size} after {retries} attempts.")
                    continue


In [10]:
download_audio("تعليم اللغة العربية", num_videos=45, batch_size=2, retries=3)

In [11]:
download_audio("نشرات الأخبار باللغة العربي", num_videos=45, batch_size=2, retries=3)

In [29]:
download_audio("روايات صوتية", num_videos=10, batch_size=1, retries=3)

Downloading batch 1 to 1...
[generic] Extracting URL: روايات صوتية 0..0
[youtube:search] Extracting URL: ytsearch1:روايات صوتية 0..0
[download] Downloading playlist: روايات صوتية 0..0
[youtube:search] query "روايات صوتية 0..0": Downloading web client config
[youtube:search] query "روايات صوتية 0..0" page 1: Downloading API JSON
[youtube:search] Playlist روايات صوتية 0..0: Downloading 1 items of 1
[download] Downloading item 1 of 1
[youtube] Extracting URL: https://www.youtube.com/watch?v=EbVmUb9s9vw
[youtube] EbVmUb9s9vw: Downloading webpage
[youtube] EbVmUb9s9vw: Downloading ios player API JSON
[youtube] EbVmUb9s9vw: Downloading mweb player API JSON
[youtube] EbVmUb9s9vw: Downloading m3u8 information
[download] Finished downloading playlist: روايات صوتية 0..0
Skipping video: رواية مسموعة | الليالي البيضاء - دوستويفسكي (من أفضل أعماله) (Duration: 8645s)
Downloading batch 2 to 2...
[generic] Extracting URL: روايات صوتية 1..1
[youtube:search] Extracting URL: ytsearch1:روايات صوتية 1..1
[

In [30]:

def split_audio(file_path, output_dir, segment_duration=15 * 60 * 1000):
    """Splits an MP3 audio file into 15-minute segments and saves them to a specified directory."""
    audio = AudioSegment.from_mp3(file_path)
    segments = [audio[i:i + segment_duration] for i in range(0, len(audio), segment_duration)]
    segment_paths = []

    for idx, segment in enumerate(segments):
        segment_name = f"{os.path.splitext(os.path.basename(file_path))[0]}_part{idx + 1}.mp3"
        segment_path = os.path.join(output_dir, segment_name)
        segment.export(segment_path, format="mp3")
        segment_paths.append(segment_path)

    return segment_paths


def transcribe_audio(file_path):
    """Transcribes an MP3 file using Whisper's base model in Arabic."""
    result = model.transcribe(file_path, language="ar")
    return result["text"]


def transcribe_existing_videos():
    """Processes and transcribes all audio files already downloaded."""
    downloaded_files = load_download_log()

    for file_name in os.listdir(downloads_path):
        file_path = os.path.join(downloads_path, file_name)
        if file_name not in downloaded_files:
            print(f"Processing {file_name}...")

            # Split audio into 15-minute segments
            segments = split_audio(file_path, split_path)

            # Transcribe each segment
            for idx, segment_path in enumerate(segments):
                transcription = transcribe_audio(segment_path)

                # Save transcription to the transcriptions directory
                transcription_file = os.path.join(
                    transcriptions_path, f"{file_name}_part{idx + 1}.txt"
                )
                with open(transcription_file, "w", encoding="utf-8") as f:
                    f.write(transcription)

                print(f"Transcription for {file_name} part {idx + 1} saved.")


        else:
            print(f"Skipping already processed file: {file_name}")



In [31]:
transcribe_existing_videos()

Processing (15) نشرة أخبار اللغة العربية الأسبوعية - 2018⧸01⧸19م.mp3...
Transcription for (15) نشرة أخبار اللغة العربية الأسبوعية - 2018⧸01⧸19م.mp3 part 1 saved.
Processing (37) نشرة أخبار اللغة العربية الأسبوعية - 2018⧸06⧸29م.mp3...
Transcription for (37) نشرة أخبار اللغة العربية الأسبوعية - 2018⧸06⧸29م.mp3 part 1 saved.
Processing (17) نشرة أخبار اللغة العربية الأسبوعية - 2018⧸02⧸02م.mp3...
Transcription for (17) نشرة أخبار اللغة العربية الأسبوعية - 2018⧸02⧸02م.mp3 part 1 saved.
Processing (49) نشرة أخبار اللغة العربية الأسبوعية - 2018⧸09⧸20م.mp3...
Transcription for (49) نشرة أخبار اللغة العربية الأسبوعية - 2018⧸09⧸20م.mp3 part 1 saved.
Processing (19) نشرة أخبار اللغة العربية الأسبوعية - 2018⧸02⧸16م.mp3...
Transcription for (19) نشرة أخبار اللغة العربية الأسبوعية - 2018⧸02⧸16م.mp3 part 1 saved.
Processing (42) نشرة أخبار اللغة العربية الأسبوعية - 2018⧸08⧸03م.mp3...
Transcription for (42) نشرة أخبار اللغة العربية الأسبوعية - 2018⧸08⧸03م.mp3 part 1 saved.
Proc

In [15]:
# Paths to directories
audio_dir = "/content/drive/MyDrive/SpeechDownloads/split"
transcriptions_dir = "/content/drive/MyDrive/SpeechDownloads/processed/transcriptions"
output_dir = "/content/drive/MyDrive/SpeechDownloads/processed"
os.makedirs(transcriptions_dir, exist_ok=True)

In [16]:
import os
import re
import pandas as pd

def preprocess_audio(audio_dir, output_dir, sample_rate=16000):
    """Convert audio files to WAV format with the required sample rate."""
    processed_audio_dir = os.path.join(output_dir, "audio")
    os.makedirs(processed_audio_dir, exist_ok=True)

    for file_name in os.listdir(audio_dir):
        if file_name.endswith(".mp3"):
            input_path = os.path.join(audio_dir, file_name)
            output_path = os.path.join(processed_audio_dir, f"{os.path.splitext(file_name)[0]}.wav")

            # Convert to WAV with the desired sample rate
            print(f"Processing audio: {file_name} -> {output_path}")
            audio = AudioSegment.from_mp3(input_path)
            audio = audio.set_frame_rate(sample_rate)
            audio.export(output_path, format="wav")

    return processed_audio_dir

# # Preprocess text transcriptions
# def preprocess_text(transcriptions_dir):
#     """Normalize and clean text transcriptions."""
#     transcriptions = {}
#     for file_name in os.listdir(transcriptions_dir):
#         if file_name.endswith(".txt"):
#             input_path = os.path.join(transcriptions_dir, file_name)
#             with open(input_path, "r", encoding="utf-8") as f:
#                 text = f.read().strip()
#                 # Normalize the filename for consistent matching
#                 normalized_key = normalize_filename(os.path.splitext(file_name)[0])
#                 transcriptions[normalized_key] = text
#     return transcriptions


def preprocess_text(transcriptions_dir, output_dir):
    """Normalize and clean text transcriptions."""
    transcriptions = {}
    for file_name in os.listdir(transcriptions_dir):
        if file_name.endswith(".txt"):
            input_path = os.path.join(transcriptions_dir, file_name)
            with open(input_path, "r", encoding="utf-8") as f:
                text = f.read().strip()
                # Normalize the filename for consistent matching
                normalized_key = os.path.splitext(file_name)[0]
                transcriptions[normalized_key] = text

                # Save cleaned transcription in output directory
                output_path = os.path.join(output_dir, f"{normalized_key}.txt")
                with open(output_path, "w", encoding="utf-8") as output_file:
                    output_file.write(text)

    return transcriptions

# Main Script

# Preprocess
print("Processing audio files...")
processed_audio_dir = preprocess_audio(audio_dir, output_dir)

print("Processing transcriptions...")
transcriptions = preprocess_text("/content/drive/MyDrive/SpeechDownloads/transcriptions", "/content/drive/MyDrive/SpeechDownloads/processed/transcriptions")
print(f"Transcriptions Dictionary Keys: {list(transcriptions.keys())}")



Processing audio files...
Processing audio: تعليم العربية للناطقين بغيرها كيف تحسن لغتك العربية ｜ How to learn arabic_part1.mp3 -> /content/drive/MyDrive/SpeechDownloads/processed/audio/تعليم العربية للناطقين بغيرها كيف تحسن لغتك العربية ｜ How to learn arabic_part1.wav
Processing audio: طريق الفصاحة 1 (المستوى الأول) تعلم اللغة العربية للمبتدئين دراسة شاملة من العجمة إلى الفصاحة_part1.mp3 -> /content/drive/MyDrive/SpeechDownloads/processed/audio/طريق الفصاحة 1 (المستوى الأول) تعلم اللغة العربية للمبتدئين دراسة شاملة من العجمة إلى الفصاحة_part1.wav
Processing audio: تعليم القراءة للاطفال ｜ تعلّم القراءة بحركة الفتح ｜ أسهل طريقة لتعليم القراءة للصغار مع زكريا_part1.mp3 -> /content/drive/MyDrive/SpeechDownloads/processed/audio/تعليم القراءة للاطفال ｜ تعلّم القراءة بحركة الفتح ｜ أسهل طريقة لتعليم القراءة للصغار مع زكريا_part1.wav
Processing audio: 1- تعليم القراءة في اللغة العربية الدرس الأول  Arabic  alphabet and how to read the Arabic language_part1.mp3 -> /content/drive/MyDrive

In [17]:
# # Normalize filenames by removing special characters, spaces, and parts like 'part1'
# def normalize_filename(filename):
#     # Remove 'mp3', 'part1', 'part2' and similar parts, and any special characters or spaces
#     filename = re.sub(r'\.mp3$', '', filename)  # Remove 'mp3' extension if any
#     filename = re.sub(r'\.wav$', '', filename)  # Remove 'wav' extension if any
#     filename = re.sub(r'\.txt$', '', filename)  # Remove 'txt' extension if any
#     filename = re.sub(r'\.[^.]*$', '', filename)  # Remove any file extension after the first dot
#     filename = re.sub(r'[^\w\s]', '', filename)  # Remove special characters
#     return re.sub(r'\s+', '_', filename).strip()  # Replace spaces with underscores and clean up


#     # Create metadata linking audio and text
# def create_metadata(audio_dir, transcriptions, output_path):
#     """Create a metadata CSV file linking audio and text."""
#     metadata = []
#     for file_name in os.listdir(audio_dir):
#         if file_name.endswith(".wav"):
#             base_name = os.path.splitext(file_name)[0]
#             # Normalize filenames for matching
#             audio_key = normalize_filename(base_name)
#             text = transcriptions.get(audio_key, "")
#             if text:
#                 metadata.append([os.path.join("audio", file_name), text])
#                 print(f"Metadata entry added: {file_name} -> {text[:50]}...")
#             else:
#                 print(f"No matching transcription found for: {file_name}")
#     # Save metadata as CSV
#     if metadata:
#         metadata_df = pd.DataFrame(metadata, columns=["audio_path", "text"])
#         metadata_df.to_csv(output_path, sep="|", header=False, index=False)
#         print(f"Metadata saved: {output_path} with {len(metadata)} entries.")
#     else:
#         print("No metadata entries were created. Check your inputs!")


# print("Creating metadata...")
# create_metadata("/content/drive/MyDrive/SpeechDownloads/processed/audio", transcriptions, "/content/drive/MyDrive/SpeechDownloads/processed/metadata.csv")

# print("Preprocessing complete!")



In [18]:
!pip install torch transformers librosa




In [32]:
!pip install --upgrade transformers torch librosa

Collecting transformers
  Downloading transformers-4.47.0-py3-none-any.whl.metadata (43 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/43.5 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m43.5/43.5 kB[0m [31m1.9 MB/s[0m eta [36m0:00:00[0m
Collecting tokenizers<0.22,>=0.21 (from transformers)
  Downloading tokenizers-0.21.0-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.7 kB)
Downloading transformers-4.47.0-py3-none-any.whl (10.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m10.1/10.1 MB[0m [31m87.6 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading tokenizers-0.21.0-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.0/3.0 MB[0m [31m99.7 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: tokenizers, transformers
  Attempting uninstall: tokenizers
    Found existing 

In [20]:
# import os
# import numpy as np
# import librosa
# import tensorflow as tf
# from tensorflow.keras import layers, Model
# from sklearn.model_selection import train_test_split

# # Step 1: Define Tacotron 2 Architecture (Simplified for Training)
# class Tacotron2(Model):
#     def __init__(self, input_dim, embedding_dim, output_dim):
#         super(Tacotron2, self).__init__()
#         self.encoder = layers.LSTM(embedding_dim, return_sequences=True)
#         self.decoder = layers.LSTM(output_dim, return_sequences=True)
#         self.dense = layers.Dense(output_dim)

#     def call(self, x):
#         x = self.encoder(x)
#         x = self.decoder(x)
#         x = self.dense(x)
#         return x

# # Step 2: Preprocess Audio Files (Convert to Mel Spectrograms)
# def preprocess_audio(audio_path, sr=22050, n_fft=1024, hop_length=256, n_mels=80):
#     # Load audio
#     audio, _ = librosa.load(audio_path, sr=sr)
#     # Normalize audio
#     audio = librosa.util.normalize(audio)
#     # Convert to Mel spectrogram
#     mel_spectrogram = librosa.feature.melspectrogram(
#         y=audio, sr=sr, n_fft=n_fft, hop_length=hop_length, n_mels=n_mels
#     )
#     mel_spectrogram = librosa.power_to_db(mel_spectrogram, ref=np.max)
#     return mel_spectrogram.T  # Transpose for time-major format

# # Step 3: Load Data (Audio and Transcriptions)
# def load_data(audio_dir, transcription_dir):
#     audio_files = sorted(os.listdir(audio_dir))
#     transcription_files = sorted(os.listdir(transcription_dir))

#     audio_data, transcription_data = [], []

#     for audio_file, transcription_file in zip(audio_files, transcription_files):
#         audio_path = os.path.join(audio_dir, audio_file)
#         transcription_path = os.path.join(transcription_dir, transcription_file)

#         # Preprocess audio
#         mel_spectrogram = preprocess_audio(audio_path)
#         audio_data.append(mel_spectrogram)

#         # Load transcription
#         with open(transcription_path, "r", encoding="utf-8") as f:
#             transcription = f.read().strip()
#         transcription_data.append(transcription)

#     return audio_data, transcription_data

# # Step 4: Tokenize Transcriptions
# def tokenize_transcriptions(transcriptions):
#     # Build vocabulary
#     vocab = sorted(set("".join(transcriptions)))
#     char_to_id = {char: idx for idx, char in enumerate(vocab)}
#     id_to_char = {idx: char for char, idx in char_to_id.items()}

#     # Convert to integer sequences
#     tokenized = [[char_to_id[char] for char in text] for text in transcriptions]

#     return tokenized, char_to_id, id_to_char

# # Step 5: Train Tacotron 2
# def train_tacotron2(audio_dir, transcription_dir, output_dim=80, batch_size=32, epochs=5):
#     # Load data
#     audio_data, transcription_data = load_data(audio_dir, transcription_dir)

#     # Tokenize transcriptions
#     tokenized_transcriptions, char_to_id, _ = tokenize_transcriptions(transcription_data)

#     # Pad sequences
#     audio_data = tf.keras.preprocessing.sequence.pad_sequences(audio_data, padding="post", dtype="float32")
#     tokenized_transcriptions = tf.keras.preprocessing.sequence.pad_sequences(
#         tokenized_transcriptions, padding="post"
#     )

#     # Split into train/test sets
#     X_train, X_val, y_train, y_val = train_test_split(
#         audio_data, tokenized_transcriptions, test_size=0.1, random_state=42
#     )

#     # Define model
#     input_dim = audio_data.shape[-1]
#     model = Tacotron2(input_dim=input_dim, embedding_dim=256, output_dim=output_dim)
#     model.compile(optimizer="adam", loss="mse")

#     # Train the model
#     model.fit(
#         X_train,
#         y_train,
#         validation_data=(X_val, y_val),
#         batch_size=batch_size,
#         epochs=epochs,
#     )

#     # Save the model
#     model.save("tacotron2_model.h5")
#     print("Model saved as tacotron2_model.h5")

# # Step 6: Run the Training Workflow
# audio_dir = "/content/drive/MyDrive/SpeechDownloads/processed/audio"
# transcription_dir = "/content/drive/MyDrive/SpeechDownloads/transcriptions"
# train_tacotron2(audio_dir, transcription_dir)

In [33]:
def pad_and_align(audio_data, tokenized_transcriptions):
    max_audio_length = max([a.shape[0] for a in audio_data])  # Longest Mel spectrogram
    max_text_length = max([len(t) for t in tokenized_transcriptions])  # Longest transcription

    # Pad Mel spectrograms (to max_audio_length)
    padded_audio_data = tf.keras.preprocessing.sequence.pad_sequences(
        audio_data, maxlen=max_audio_length, padding="post", dtype="float32"
    )

    # Pad tokenized transcriptions (to max_text_length)
    padded_transcriptions = tf.keras.preprocessing.sequence.pad_sequences(
        tokenized_transcriptions, maxlen=max_text_length, padding="post"
    )

    return padded_audio_data, padded_transcriptions

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
import os
import librosa

# Function to load audio data
def load_audio_files(audio_dir):
    audio_files = []
    for filename in os.listdir(audio_dir):
        if filename.endswith(".wav"):  # Adjust if you're using a different format
            file_path = os.path.join(audio_dir, filename)
            audio, _ = librosa.load(file_path, sr=22050)  # Change the sample rate if needed
            audio_files.append(audio)
    return audio_files

# Function to load transcription data
def load_transcriptions(transcription_dir):
    transcription_files = []
    for filename in os.listdir(transcription_dir):
        if filename.endswith(".txt"):  # Adjust if you're using a different format
            file_path = os.path.join(transcription_dir, filename)
            with open(file_path, 'r') as file:
                transcription = file.read().strip()  # Read the transcription
                transcription_files.append(transcription)
    return transcription_files

# Function to load both audio and transcription data
def load_data(audio_dir, transcription_dir):
    audio_data = load_audio_files(audio_dir)
    transcription_data = load_transcriptions(transcription_dir)
    return audio_data, transcription_data

# Tokenize transcriptions (you can adjust the tokenizer to fit your use case)
def tokenize_transcriptions(transcription_data):
    # Simple tokenizer that maps each character to an integer ID
    char_to_id = {char: idx for idx, char in enumerate("abcdefghijklmnopqrstuvwxyz ")}
    tokenized_transcriptions = []

    for transcription in transcription_data:
        tokenized = [char_to_id[char] for char in transcription.lower() if char in char_to_id]
        tokenized_transcriptions.append(tokenized)

    return tokenized_transcriptions, char_to_id

# Function to pad and align audio and transcription data
def pad_and_align(audio_data, tokenized_transcriptions, max_audio_len=None, max_trans_len=None):
    # Get maximum lengths if not provided
    if max_audio_len is None:
        max_audio_len = max([len(audio) for audio in audio_data])
    if max_trans_len is None:
        max_trans_len = max([len(t) for t in tokenized_transcriptions])

    # Padding audio data
    padded_audio_data = [audio.tolist() + [0] * (max_audio_len - len(audio)) if len(audio) < max_audio_len else audio.tolist()[:max_audio_len] for audio in audio_data]

    # Padding transcriptions
    padded_tokenized_transcriptions = [transcription + [0] * (max_trans_len - len(transcription)) if len(transcription) < max_trans_len else transcription[:max_trans_len] for transcription in tokenized_transcriptions]

    return padded_audio_data, padded_tokenized_transcriptions

# Define the Tacotron2 model (Simplified version for illustration)
class Tacotron2(nn.Module):
    def __init__(self, input_dim, embedding_dim, output_dim):
        super(Tacotron2, self).__init__()
        # Define layers
        self.embedding = nn.Embedding(256, embedding_dim)  # Assuming 256 character tokens
        self.lstm = nn.LSTM(input_dim, embedding_dim, batch_first=True)
        self.linear = nn.Linear(embedding_dim, output_dim)

    def forward(self, x):
        x = self.embedding(x)
        x, _ = self.lstm(x)
        x = self.linear(x)
        return x

# Training function for Tacotron2
def traintacotron2(audio_dir, transcription_dir, output_dim=80, batch_size=32, epochs=5):
    # Load data
    audio_data, transcription_data = load_data(audio_dir, transcription_dir)

    # Tokenize transcriptions
    tokenized_transcriptions, char_to_id = tokenize_transcriptions(transcription_data)

    # Pad and align data
    audio_data, tokenized_transcriptions = pad_and_align(audio_data, tokenized_transcriptions)

    # Split into train/test sets
    X_train, X_val, y_train, y_val = train_test_split(
        audio_data, tokenized_transcriptions, test_size=0.1, random_state=42
    )

    # Convert data to PyTorch tensors
    X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
    X_val_tensor = torch.tensor(X_val, dtype=torch.float32)
    y_train_tensor = torch.tensor(y_train, dtype=torch.long)  # Assuming target is class indices
    y_val_tensor = torch.tensor(y_val, dtype=torch.long)

    # Create DataLoader for efficient batching
    train_data = TensorDataset(X_train_tensor, y_train_tensor)
    val_data = TensorDataset(X_val_tensor, y_val_tensor)
    train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_data, batch_size=batch_size, shuffle=False)

    # Define model
    input_dim = X_train_tensor.shape[-1]  # Feature size of Mel spectrograms (or audio data)
    model = Tacotron2(input_dim=input_dim, embedding_dim=256, output_dim=output_dim)

    # Define optimizer and loss function
    optimizer = optim.Adam(model.parameters(), lr=1e-4)
    criterion = nn.CrossEntropyLoss()

    # Enable mixed precision training
    scaler = torch.cuda.amp.GradScaler()

    # Training loop
    for epoch in range(epochs):
        model.train()
        running_loss = 0.0

        # Loop through batches
        for inputs, targets in train_loader:
            # Zero gradients
            optimizer.zero_grad()

            # Mixed precision training
            with torch.cuda.amp.autocast():
                outputs = model(inputs)
                loss = criterion(outputs.view(-1, output_dim), targets.view(-1))

            # Scale the loss for mixed precision
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()

            # Accumulate loss
            running_loss += loss.item()

        # Print loss after every epoch
        print(f"Epoch {epoch+1}/{epochs}, Loss: {running_loss / len(train_loader)}")

        # Validation
        model.eval()
        with torch.no_grad():
            val_loss = 0.0
            for inputs, targets in val_loader:
                with torch.cuda.amp.autocast():
                    outputs = model(inputs)
                    loss = criterion(outputs.view(-1, output_dim), targets.view(-1))
                val_loss += loss.item()
            print(f"Validation Loss: {val_loss / len(val_loader)}")

        # Save the model after each epoch
        torch.save(model.state_dict(), f"tacotron2_epoch{epoch+1}.pth")
        print(f"Model saved as tacotron2_epoch{epoch+1}.pth")

# Example usage:
traintacotron2('/content/drive/MyDrive/SpeechDownloads/processed/audio', '/content/drive/MyDrive/SpeechDownloads/processed/transcriptions', epochs=5)


In [24]:
# import os
# import torch
# from transformers import Wav2Vec2Model, Wav2Vec2Processor
# import torchaudio
# import numpy as np
# from tqdm import tqdm

# # Load Wav2Vec2 Processor and Model
# model_name = "facebook/wav2vec2-base"
# processor = Wav2Vec2Processor.from_pretrained(model_name)
# model = Wav2Vec2Model.from_pretrained(model_name)

# # Move model to GPU if available
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# model.to(device)

# # Function to load and preprocess audio
# def preprocess_audio_torch(file_path, sampling_rate=16000):
#     waveform, sr = torchaudio.load(file_path, normalize=True)
#     if sr != sampling_rate:
#         resampler = torchaudio.transforms.Resample(orig_freq=sr, new_freq=sampling_rate)
#         waveform = resampler(waveform)
#     return waveform.squeeze(0).numpy()

# # Function to extract embeddings
# def extract_embeddings(audio, processor, model):
#     inputs = processor(audio, sampling_rate=16000, return_tensors="pt", padding=True)
#     inputs = inputs.to(device)  # Move inputs to GPU
#     with torch.no_grad():
#         outputs = model(**inputs)
#     embeddings = outputs.last_hidden_state
#     return embeddings

# # Directory containing processed audio files
# audio_dir = "/content/drive/MyDrive/SpeechDownloads/processed/audio"

# # Output directory for embeddings
# output_dir = "/content/drive/MyDrive/SpeechDownloads/processed/embeddings"
# os.makedirs(output_dir, exist_ok=True)

# # Process all audio files in the directory
# for audio_file in tqdm(os.listdir(audio_dir)):
#     if audio_file.endswith(".wav"):
#         try:
#             file_path = os.path.join(audio_dir, audio_file)
#             print(f"Processing file: {file_path}")

#             # Preprocess and extract embeddings
#             audio = preprocess_audio_torch(file_path)
#             embeddings = extract_embeddings(audio, processor, model)

#             # Convert embeddings to numpy
#             embeddings_numpy = embeddings.squeeze(0).cpu().numpy()  # Move to CPU if on GPU

#             # Save embeddings as .npy file
#             output_path = os.path.join(output_dir, f"{os.path.splitext(audio_file)[0]}_embeddings.npy")
#             np.save(output_path, embeddings_numpy)
#             print(f"Saved embeddings to {output_path}")
#         except Exception as e:
#             print(f"Error processing {audio_file}: {e}")


In [25]:
# import numpy as np

# # Step 1: Read the transcriptions from the text file
# with open("/content/drive/MyDrive/SpeechDownloads/transcriptions/15 شيء لا يجب أن يراه البشر !!!.mp3_part1.txt", "r") as f:
#     transcriptions = f.readlines()

# # Step 2: Preprocess the transcriptions (e.g., convert to list of tokens or character IDs)
# # For simplicity, we'll tokenize by splitting by spaces (character-level encoding can also be done)
# # For phoneme-based transcriptions, additional preprocessing might be needed.

# # Here, we're just converting each transcription into a list of characters
# processed_transcriptions = [list(transcription.strip()) for transcription in transcriptions]

# # Optionally, you can convert characters to integer IDs using a vocabulary
# # For example, creating a simple vocabulary of characters:
# vocab = set(''.join(transcriptions))  # Get unique characters
# char_to_id = {char: idx for idx, char in enumerate(vocab)}  # Map characters to IDs

# # Convert characters to integer IDs
# transcriptions_as_ids = [[char_to_id[char] for char in transcription] for transcription in processed_transcriptions]

# # Step 3: Save the transcriptions as a .npy file
# np.save("/content/drive/MyDrive/SpeechDownloads/transcriptions.npy", transcriptions_as_ids)

# print("Transcriptions have been saved as transcriptions.npy")


In [26]:
# import numpy as np

# # Step 1: Read the transcriptions from the text file
# with open("/content/drive/MyDrive/SpeechDownloads/transcriptions/15 شيء لا يجب أن يراه البشر !!!.mp3_part1.txt", "r") as f:
#     transcriptions = f.readlines()

# # Step 2: Preprocess the transcriptions
# # Tokenizing by splitting by spaces (character-level encoding can also be done)
# processed_transcriptions = [list(transcription.strip()) for transcription in transcriptions]

# # Create a simple vocabulary of characters from the transcriptions
# vocab = set(''.join(transcriptions))  # Get unique characters
# char_to_id = {char: idx for idx, char in enumerate(vocab)}  # Map characters to IDs

# # Convert characters to integer IDs
# transcriptions_as_ids = [[char_to_id[char] for char in transcription] for transcription in processed_transcriptions]

# # Step 3: Ensure the number of transcriptions matches the number of embeddings
# # Here, we need to match the size to 33750
# desired_size = 33750

# # If there are more transcriptions than the desired size, slice them
# if len(transcriptions_as_ids) > desired_size:
#     transcriptions_as_ids = transcriptions_as_ids[:desired_size]

# # If there are fewer transcriptions than the desired size, repeat or pad them
# elif len(transcriptions_as_ids) < desired_size:
#     # Repeat the transcriptions to reach the desired size (or pad with empty sequences)
#     while len(transcriptions_as_ids) < desired_size:
#         transcriptions_as_ids.extend(transcriptions_as_ids[:desired_size - len(transcriptions_as_ids)])

# # Step 4: Save the processed transcriptions as a .npy file
# np.save("/content/drive/MyDrive/SpeechDownloads/transcriptions.npy", transcriptions_as_ids)

# print("Transcriptions have been saved as transcriptions.npy")


In [27]:
# import numpy as np
# import tensorflow as tf
# from tensorflow.keras import layers, Model

# # Define a simple Tacotron 2-like architecture (simplified for example)
# class Tacotron2(Model):
#     def __init__(self, embedding_dim, output_dim, max_length):
#         super(Tacotron2, self).__init__()
#         self.encoder = layers.LSTM(embedding_dim, return_sequences=True)
#         self.decoder = layers.LSTM(output_dim, return_sequences=True)
#         self.output_layer = layers.Dense(output_dim)  # For generating output features (e.g., mel spectrogram)
#         self.max_length = max_length  # Maximum length of transcription

#     def call(self, x):
#         x = self.encoder(x)  # Encoder outputs sequence
#         x = self.decoder(x)  # Decoder generates output sequence
#         x = self.output_layer(x)  # Output features at each timestep
#         return x

# # Prepare data for training (Wave2Vec embeddings and transcriptions)
# wave2vec_embeddings = np.load("/content/drive/MyDrive/SpeechDownloads/processed/embeddings/15 شيء لا يجب أن يراه البشر !!!_part1_embeddings.npy")
# transcriptions = np.load("/content/drive/MyDrive/SpeechDownloads/transcriptions.npy")

# # Step 1: Reshape the embeddings to include the time dimension
# wave2vec_embeddings = np.expand_dims(wave2vec_embeddings, axis=1)  # Add time dimension

# # Step 2: Check the shape of wave2vec_embeddings after reshaping
# print(f"Shape of wave2vec_embeddings after reshaping: {wave2vec_embeddings.shape}")

# # Step 3: Adjust the shape of transcriptions to match the output sequence length
# # Make sure the transcriptions have the same sequence length as the output of the model
# max_length = 7738  # Set this to the expected length of your transcriptions
# transcriptions = np.pad(transcriptions, ((0, 0), (0, max_length - transcriptions.shape[1])), 'constant')

# # Define the model
# model = Tacotron2(embedding_dim=7738, output_dim=80, max_length=max_length)
# model.compile(optimizer="adam", loss="mse")

# # Step 4: Train the model
# model.fit(wave2vec_embeddings, transcriptions, epochs=5)

# # Save the model weights
# model.save_weights("tacotron2_weights.h5")
# print("Model weights saved as tacotron2_weights.h5")


**Evaluation**

In [1]:
# pip install jiwer

Collecting jiwer
  Downloading jiwer-3.0.5-py3-none-any.whl.metadata (2.7 kB)
Collecting rapidfuzz<4,>=3 (from jiwer)
  Downloading rapidfuzz-3.10.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (11 kB)
Downloading jiwer-3.0.5-py3-none-any.whl (21 kB)
Downloading rapidfuzz-3.10.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.1/3.1 MB[0m [31m46.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: rapidfuzz, jiwer
Successfully installed jiwer-3.0.5 rapidfuzz-3.10.1


In [None]:
# import jiwer
# import torch
# import numpy as np

# # Assuming the model is loaded and the data is available
# def evaluate_model(model, dataloader, device):
#     model.eval()
#     predictions = []
#     ground_truth = []

#     # Iterate over the validation set
#     for inputs, targets in dataloader:
#         inputs = inputs.squeeze(1).to(device)  # Remove channel dimension for the model
#         targets = targets.to(device)

#         # Perform inference
#         with torch.no_grad():
#             outputs = model(inputs)

#         # Convert output probabilities to text indices
#         predicted_transcriptions = torch.argmax(outputs, dim=2)

#         # Convert indices to text (decode predicted transcriptions)
#         decoded_preds = decode(predicted_transcriptions)
#         decoded_targets = decode(targets)

#         predictions.extend(decoded_preds)
#         ground_truth.extend(decoded_targets)

#     # Calculate Word Error Rate (WER)
#     wer_score = jiwer.wer(ground_truth, predictions)
#     return wer_score

# # Assuming the decode function converts indices to text based on your tokenization method
# def decode(sequence):
#     # Implement your method to decode indices to text
#     pass

# # Example usage:
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# wer_score = evaluate_model(model, val_loader, device)
# print(f"Word Error Rate (WER): {wer_score}")


**Testing**

In [2]:
# pip install pesq

Collecting pesq
  Downloading pesq-0.0.4.tar.gz (38 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pesq
  Building wheel for pesq (setup.py) ... [?25l[?25hdone
  Created wheel for pesq: filename=pesq-0.0.4-cp310-cp310-linux_x86_64.whl size=262948 sha256=9669f09a68db77cc393586e8ec60af059185dd5d80d5562e9dd9a2534fc44367
  Stored in directory: /root/.cache/pip/wheels/c5/4e/2c/251524370c0fdd659e99639a0fbd0ca5a782c3aafcd456b28d
Successfully built pesq
Installing collected packages: pesq
Successfully installed pesq-0.0.4


In [None]:
# from pesq import pesq
# import librosa

# def compute_pesq(reference_audio, generated_audio, sr=22050):
#     # Load the reference and generated audios
#     reference, _ = librosa.load(reference_audio, sr=sr)
#     generated, _ = librosa.load(generated_audio, sr=sr)

#     # Compute PESQ score
#     pesq_score = pesq(reference, generated, sr)
#     return pesq_score

# # Example usage
# reference_audio_path = "reference.wav"
# generated_audio_path = "generated.wav"
# pesq_score = compute_pesq(reference_audio_path, generated_audio_path)
# print(f"PESQ Score: {pesq_score}")


**AB Test**

In [None]:
# from scipy import stats

# def ab_test(model_a, model_b, val_loader, device):
#     # Evaluate both models
#     wer_model_a = evaluate_model(model_a, val_loader, device)
#     wer_model_b = evaluate_model(model_b, val_loader, device)

#     print(f"Model A WER: {wer_model_a}")
#     print(f"Model B WER: {wer_model_b}")

#     # Perform t-test to see if there's a significant difference
#     # In practice, you'd collect multiple results, not just one.
#     t_stat, p_value = stats.ttest_ind(wer_model_a, wer_model_b)

#     if p_value < 0.05:
#         print("The difference between models is statistically significant!")
#     else:
#         print("No significant difference between models.")

# # Example usage:
# ab_test(model_a, model_b, val_loader, device)
