In [None]:
! pip install yt-dlp

In [None]:
! pip install pydub

In [None]:
! pip install spleeter

In [None]:
! pip install speechrecognition

In [None]:
import os
import yt_dlp
from IPython.display import Audio
from pydub import AudioSegment, silence
import shutil
from spleeter.separator import Separator
import speech_recognition as sr
import pandas as pd

# Download Video

In [None]:
def download_youtube_audio_yt_dlp(video_url, output_path="downloaded_audio"):
      os.makedirs(output_path, exist_ok=True)

      ydl_opts = {
          'format': 'bestaudio/best',
          'extractaudio': True,
          'audioformat': 'mp3',
          'outtmpl': os.path.join(output_path, '%(title)s.%(ext)s'),
          'ignoreerrors': True,  # Ignore errors such as private videos
      }

      with yt_dlp.YoutubeDL(ydl_opts) as ydl:
          try:
              ydl.download([video_url])
          except yt_dlp.utils.DownloadError as e:
              print(f"Skipping video due to error: {e}")

# Covert video from .webm to .wav

In [None]:
def convert_webm_to_wav(output_path, target_rate=16000):
    for file_name in os.listdir(output_path):
      if file_name.endswith('.webm'):
          input_webm_path = os.path.join(output_path, file_name)
          audio = AudioSegment.from_file(input_webm_path, format='webm')
          output_wav_path = os.path.splitext(input_webm_path)[0] + '.wav'
          audio = audio.set_frame_rate(target_rate)
          audio.export(output_wav_path, format='wav')
          print(f"Converted '{input_webm_path}' to '{output_wav_path}'.")
          os.remove(input_webm_path)
          print(f"Deleted original file: '{input_webm_path}'.")
      elif file_name.endswith('.m4a'):
          input_m4a_path = os.path.join(output_path, file_name)
          audio = AudioSegment.from_file(input_m4a_path, format='m4a')
          output_wav_path = os.path.splitext(input_m4a_path)[0] + '.wav'
          audio = audio.set_frame_rate(target_rate)
          audio.export(output_wav_path, format='wav')
          print(f"Converted '{input_m4a_path}' to '{output_wav_path}'.")
          os.remove(input_m4a_path)
          print(f"Deleted original file: '{input_m4a_path}'.")

# Split Audio into Chunks

In [None]:
def split_audio_into_big_chunks(input_audio_path, chunk_duration_ms=5*60*1000):
    """
    Split a given audio file into chunks of specified duration and delete the original file.

    Parameters:
        input_audio_path (str): The path to the input audio file.
        chunk_duration_ms (int): The duration of each chunk in milliseconds (default is 10 minutes).
    """
    # Load the audio file
    audio = AudioSegment.from_file(input_audio_path)

    # Define the directory to save the chunks
    output_directory = os.path.dirname(input_audio_path)

    # Get the base name for output chunks (without extension)
    base_name = os.path.splitext(os.path.basename(input_audio_path))[0]

    # Split the audio into chunks
    chunks = [audio[i:i + chunk_duration_ms] for i in range(0, len(audio), chunk_duration_ms)]

    # Delete the original file
    os.remove(input_audio_path)

    # Export each chunk with a new name in the same directory
    for idx, chunk in enumerate(chunks):
        chunk_path = os.path.join(output_directory, f"{base_name}_chunk_{idx + 1}.wav")
        chunk.export(chunk_path, format="wav")
        print(f"Exported {chunk_path}")

    print(f"Audio file '{input_audio_path}' split into chunks and replaced in the original directory.")

In [None]:
def process_all_audio_files_in_directory(directory_path):
    """
    Process all audio files in a directory, split them into chunks and delete the originals.

    Parameters:
        directory_path (str): The directory containing the audio files to process.
    """
    # Loop over all files in the directory
    for filename in os.listdir(directory_path):
        file_path = os.path.join(directory_path, filename)

        # Check if the file is an audio file (e.g., .wav, .mp3)
        if os.path.isfile(file_path) and (filename.endswith(".wav") or filename.endswith(".mp3")):
            print(f"Processing file: {file_path}")
            split_audio_into_big_chunks(file_path)

# Separate Vocals from Music

In [None]:
def separate_vocals_from_chunks(directory_path):
    """
    Process all audio files in a directory that end with '_chunk_1.wav' using Spleeter,
    and save only the vocal track in the same directory with the original name (without '_vocal' suffix).

    Parameters:
        directory_path (str): The directory containing the audio files to process.
    """
    separator = Separator('spleeter:2stems')

    # Loop over all files in the directory
    for filename in os.listdir(directory_path):
        file_path = os.path.join(directory_path, filename)

        # Check if the file ends with '_chunk_1.wav'
        # if os.path.isfile(file_path) and filename.endswith("_chunk_1.wav"):
        print(f"Processing file: {file_path}")

        # Use Spleeter to separate the audio and save in the same directory
        separator.separate_to_file(file_path, directory_path)

        # Create an output path for the vocals with the same name as the original file
        output_vocal_path = os.path.join(directory_path, f"{os.path.splitext(filename)[0]}_vocal.wav")

        # After separation, move or rename the vocal track to the desired location
        vocals_path = os.path.join(directory_path, f"{os.path.splitext(filename)[0]}/vocals.wav")

        # Check if the vocals file exists and rename/move it
        if os.path.exists(vocals_path):
            os.rename(vocals_path, output_vocal_path)
            print(f"Vocal track saved as: {output_vocal_path}")

            # Optionally, delete the accompaniment file (if needed)
            accompaniment_path = os.path.join(directory_path, f"{os.path.splitext(filename)[0]}/accompaniment.wav")
            if os.path.exists(accompaniment_path):
                os.remove(accompaniment_path)
                print(f"Deleted accompaniment: {accompaniment_path}")

        # Clean up the temporary folder created by Spleeter (if necessary)
        temp_folder = os.path.join(directory_path, os.path.splitext(filename)[0])
        if os.path.exists(temp_folder):
            for temp_file in os.listdir(temp_folder):
                os.remove(os.path.join(temp_folder, temp_file))  # Delete individual files in the folder
            os.rmdir(temp_folder)
            print(f"Deleted temporary folder: {temp_folder}")

        # Remove input audio
        os.remove(file_path)
        print(f"Deleted original file: '{filename}'.")

        # Explicitly delete any large variables to free up memory
        del file_path, output_vocal_path, vocals_path, accompaniment_path, temp_folder

    # Clean up separator object if no longer needed
    del separator

# Remove Silences

In [None]:
def remove_silences(input_dir):

    for file_name in os.listdir(input_dir):
      if file_name.endswith('_vocal.wav'):
        input_audio_path = os.path.join(input_dir, file_name)
        merged_filename = f"{os.path.splitext(file_name)[0]}_without_silence.wav"

        audio = AudioSegment.from_wav(input_audio_path)

        # Define parameters for silence detection
        min_silence_len = 500  # Minimum length of silence in milliseconds to consider it a split point
        silence_thresh = -50  # Silence threshold in dBFS (adjust based on your audio)
        keep_silence = 200  # Amount of silence to leave at the beginning and end of each segment, in milliseconds

        chunks = silence.split_on_silence(
            audio,
            min_silence_len=min_silence_len,
            silence_thresh=silence_thresh,
            keep_silence=keep_silence
        )

        merged_audio = AudioSegment.empty()

        if chunks:
          merged_audio = sum(chunks)
          del chunks
          merged_audio.export(os.path.join(input_dir, merged_filename), format="wav")
          print(f"Merged audio saved as '{merged_filename}'.")
        else:
          print("No non-silence audio found. Nothing was saved.")

        # Remove input audio
        os.remove(input_audio_path)
        print(f"Deleted original file: '{file_name}'.")

# Splitting Audio and Transcribing

In [None]:
def remove_last_word(audio_chunk):
    recognizer = sr.Recognizer()

    temp_filename = "temp_chunk.wav"
    with open(temp_filename, "wb") as temp_file:
        audio_chunk.export(temp_file, format="wav")

    with sr.AudioFile(temp_filename) as source:
        audio = recognizer.record(source)

        try:
            transcript = recognizer.recognize_google(audio,language="ar-EG")
            print(f"Transcript: '{transcript}'")

            words = transcript.split()
            if len(words) > 0:
                last_word = words[-1]
                duration_per_word = len(audio_chunk) / len(words)
                end_time = len(words) * duration_per_word - duration_per_word

                return audio_chunk[:int(end_time)], transcript

        except sr.UnknownValueError:
            print("Could not understand audio")
        except sr.RequestError:
            print("Could not request results from Google Speech Recognition service")

    return audio_chunk, ""

In [None]:
def split_audio_into_chunks(input_audio_path, output_drive_path, file_name, chunk_length_ms=10000):
    audio = AudioSegment.from_wav(input_audio_path)
    local_output_dir = os.path.splitext(os.path.join(output_path, file_name))[0] # directory to save here
    os.makedirs(local_output_dir, exist_ok=True)

    output_dir = os.path.splitext(os.path.join(output_drive_path, file_name))[0] # directory to save in drive
    os.makedirs(output_dir, exist_ok=True)


    num_chunks = len(audio) // chunk_length_ms + (1 if len(audio) % chunk_length_ms != 0 else 0)

    last_word = ""

    for i in range(num_chunks):
        start_time = i * chunk_length_ms
        end_time = min(start_time + chunk_length_ms, len(audio))

        chunk = audio[start_time:end_time]

        chunk, transcript = remove_last_word(chunk)

        if last_word and transcript:
            current_words = transcript.split()
            if current_words and current_words[0].startswith(last_word):
                chunk_duration = len(chunk)
                duration_per_word = chunk_duration / len(current_words)
                chunk = chunk[int(duration_per_word):]
                transcript = " ".join(current_words[1:])

        if transcript:
            last_word = transcript.split()[-1]

        chunk_filename = os.path.join(output_dir, f"{i + 1}.wav")
        chunk.export(chunk_filename, format="wav")
        print(f"Exported: {chunk_filename}")

        local_chunk_filename = os.path.join(local_output_dir, f"{i + 1}.wav")
        chunk.export(local_chunk_filename, format="wav")
        print(f"Exported: {local_chunk_filename}")

        transcript_filename = os.path.join(output_dir, f"{i + 1}.txt")
        with open(transcript_filename, "w") as transcript_file:
            transcript_file.write(transcript)
            print(f"Transcript saved: {transcript_filename}")

        local_transcript_filename = os.path.join(local_output_dir, f"{i + 1}.txt")
        with open(local_transcript_filename, "w") as transcript_file:
            transcript_file.write(transcript)
            print(f"Transcript saved: {local_transcript_filename}")

    os.remove(input_audio_path)
    print(f"Deleted original file: '{input_audio_path}'.")

In [None]:
def print_clean_txt_file(file_path):
    try:
        with open(file_path, 'r') as file:
            lines = file.readlines()
            print("\n".join(line.strip() for line in lines))
    except FileNotFoundError:
        print(f"Error: The file '{file_path}' was not found.")
    except Exception as e:
        print(f"An error occurred: {e}")

In [None]:
def splitting_audio(output_path, output_drive_path):
  for file_name in os.listdir(output_path):
    if file_name.endswith('.wav'):
        input_audio_path = os.path.join(output_path, file_name)
        split_audio_into_chunks(input_audio_path, output_drive_path, file_name)

# Create Transcript CSV

In [None]:
def create_transcript_csv(audio_dir, transcript_dir, gender, age, mood, environment):

    audio_files = []
    transcripts = []
    genders = []
    ages = []
    accents = []
    moods = []
    environments = []
    duration = []

    accent = "ar-EG"

    for filename in os.listdir(audio_dir):
        if filename.endswith(".wav"):
            audio_path = os.path.join(audio_dir, filename)
            transcript_path = os.path.join(transcript_dir, f"{os.path.splitext(filename)[0]}.txt")

            if os.path.exists(transcript_path):
                with open(transcript_path, 'r') as file:
                    transcript = file.read().strip()
            else:
                transcript = ""

            audio_files.append(audio_path)
            transcripts.append(transcript)
            genders.append(gender)
            ages.append(age)
            accents.append(accent)
            moods.append(mood)
            environments.append(environment)
            duration.append(AudioSegment.from_wav(audio_path).duration_seconds)

    df = pd.DataFrame({
        'Audio': audio_files,
        'Transcript': transcripts,
        'Gender': genders,
        'Age': ages,
        'Accent': accents,
        'Mood': moods,
        'Environment': environments,
        'Duration': duration
    })


    return df

In [None]:
def empty_directory(directory_path):
    for item in os.listdir(directory_path):
        item_path = os.path.join(directory_path, item)
        if os.path.isdir(item_path):
            shutil.rmtree(item_path)  # Remove a directory and its contents
        else:
            os.unlink(item_path)  # Remove a file


In [None]:
from google.colab import drive
drive.mount('/content/drive')
output_drive_path = '/content/drive/My Drive/speech_finalproject/data_processing'

Mounted at /content/drive


In [None]:
video_url = 'https://www.youtube.com/watch?v=tBM9Y1dgEJU&list=PLCpK4282MCT80bGKKd_Ia8-y9HzKQcyOP'
output_path = './downloaded_audio'

In [None]:
download_youtube_audio_yt_dlp(video_url, output_path=output_path) # download audio
convert_webm_to_wav(output_path) # convert to .wav
process_all_audio_files_in_directory(output_path) # split audio to five minutes chunks
separate_vocals_from_chunks(output_path) # separate vocals from chunks (remove music)
remove_silences(output_path) # remove silence
splitting_audio(output_path, output_drive_path) # split audio chunks int0 10 seconds chunks and save them on drive

In [None]:
csv_files = []

for dir in os.listdir(output_path):
  if dir.endswith('_without_silence'):
      audio_directory = os.path.join(output_drive_path, dir)
      transcript_directory = audio_directory
      df = create_transcript_csv(audio_directory, transcript_directory, "male", 33, "serious", "clean")
      csv_files.append(df)
      # Save DataFrame to CSV
      del audio_directory, transcript_directory, df

del dir
file_path = os.path.join(output_drive_path,'dataset.csv')
combined_df = pd.concat(csv_files, ignore_index=True)
combined_df.to_csv(file_path, mode='a', header=False, index=False)

### empty Disk and RAM
empty_directory(output_path) # empty the local directory
del csv_files # delete temproray csv files list
del combined_df # delete final dataframe

In [None]:
# empty_directory(output_path)
os.listdir(output_path)

[]

# Get The Total Duration Collected

In [None]:
from datetime import timedelta

file_path = os.path.join(output_drive_path,'dataset.csv')
df = pd.read_csv(file_path, header=0)
del file_path

total_duration_male = df.loc[df['Gender'] == 'male', "Duration"].sum()
formatted_total_duration_male = str(timedelta(seconds=int(total_duration_male)))

empty_transcript_duration_male = df.loc[((df['Transcript'].isna()) | (df['Transcript'] == '')) & (df['Gender'] == 'male'), 'Duration'].sum()

net_duration_male = total_duration_male - empty_transcript_duration_male
formatted_net_duration_male = str(timedelta(seconds=int(net_duration_male)))

empty_transcript_male_count = df.loc[(df['Gender'] == 'male') & ((df['Transcript'].isna()) | (df['Transcript'] == ''))].shape[0]
transcript_male_count = df.loc[df['Gender'] == 'male', 'Gender'].count()

print("Information of Male Records")
print(f"Total Duration: {formatted_total_duration_male}")  # 66:05:49
print(f"Net Duration: {formatted_net_duration_male}") # 42:39:46
print(f"Percentage of Empty Transcripts Duration: {(empty_transcript_duration_male/total_duration_male)*100:.2f}%")
print(f"Total Transcripts: {transcript_male_count}")
print(f"Empty Transcripts: {empty_transcript_male_count}")
print(f"Percentage of Empty Transcripts Count: {(empty_transcript_male_count/transcript_male_count)*100:.2f}%")

Information of Male Records
Total Duration: 2 days, 18:05:49
Net Duration: 1 day, 18:39:46
Percentage of Empty Transcripts Duration: 35.45%
Total Transcripts: 26913
Empty Transcripts: 8491
Percentage of Empty Transcripts Count: 31.55%


In [None]:
total_duration_female = df.loc[df['Gender'] == 'female', "Duration"].sum()
formatted_total_duration_female = str(timedelta(seconds=int(total_duration_female)))

empty_transcript_duration_female = df.loc[((df['Transcript'].isna()) | (df['Transcript'] == '')) & (df['Gender'] == 'female'), 'Duration'].sum()

net_duration_female = total_duration_female - empty_transcript_duration_female
formatted_net_duration_female = str(timedelta(seconds=int(net_duration_female)))

empty_transcript_female_count = df.loc[(df['Gender'] == 'female') & ((df['Transcript'].isna()) | (df['Transcript'] == ''))].shape[0]
transcript_female_count = df.loc[df['Gender'] == 'female', 'Gender'].count()

print("Information of Female Records")
print(f"Total Duration: {formatted_total_duration_female}") # 62:33:10
print(f"Net Duration: {formatted_net_duration_female}")  # 38:44:03 (62:30)
print(f"Percentage of Empty Transcripts Duration: {(empty_transcript_duration_female/total_duration_female)*100:.2f}%")
print(f"Total Transcripts: {transcript_female_count}")
print(f"Empty Transcripts: {empty_transcript_female_count}")
print(f"Percentage of Empty Transcripts Count: {(empty_transcript_female_count/transcript_female_count)*100:.2f}%")

Information of Female Records
Total Duration: 2 days, 14:33:10
Net Duration: 1 day, 14:44:03
Percentage of Empty Transcripts Duration: 38.08%
Total Transcripts: 26004
Empty Transcripts: 8793
Percentage of Empty Transcripts Count: 33.81%


In [None]:
total_duration = total_duration_male + total_duration_female
formatted_total_duration = str(timedelta(seconds=int(total_duration)))
total_net_duration = net_duration_male + net_duration_female
formatted_total_net_duration = str(timedelta(seconds=int(total_net_duration)))

print("Total Information")
print(f"Total Duration: {formatted_total_duration} hours") # 128:38:59
print(f"Net Duration: {formatted_total_net_duration} hours") # 81:23:50
print(f"Percentage of Empty Transcripts Duration: {(total_net_duration/total_duration)*100:.2f}%")
print(f"Total Transcripts: {len(df)}")
df

Total Information
Total Duration: 5 days, 8:38:59 hours
Net Duration: 3 days, 9:23:50 hours
Percentage of Empty Transcripts Duration: 63.27%
Total Transcripts: 52917


Unnamed: 0,Audio,Transcript,Gender,Age,Accent,Mood,Environment,Duration
0,/content/drive/My Drive/speech_finalproject/da...,يمكن بعد فتاه المصنع بعد الفيلم ده الفيلم ده ه...,female,55,ar-EG,casual,clean,9.230000
1,/content/drive/My Drive/speech_finalproject/da...,ثقه في نفسي ويمكن الثقه دي هي اللي خلتني ما ان...,female,55,ar-EG,casual,clean,9.544989
2,/content/drive/My Drive/speech_finalproject/da...,بعد نجاح فيلم فتاه المصنع ونجاحي انا كممثله وا...,female,55,ar-EG,casual,clean,9.230000
3,/content/drive/My Drive/speech_finalproject/da...,ممثلين الناس كلها بتقول انت عامله دور حلو انت ...,female,55,ar-EG,casual,clean,9.410998
4,/content/drive/My Drive/speech_finalproject/da...,,female,55,ar-EG,casual,clean,10.000000
...,...,...,...,...,...,...,...,...
52912,/content/drive/My Drive/speech_finalproject/da...,والخير للمؤمن الايمان والتقوى هو ده اللي هياخد...,male,33,ar-EG,serious,clean,9.333000
52913,/content/drive/My Drive/speech_finalproject/da...,الدنيا فعند الله ثواب الدنيا والاخره يعني ربنا...,male,33,ar-EG,serious,clean,9.473000
52914,/content/drive/My Drive/speech_finalproject/da...,جزاكم الله خيرا سبحان الله والحمد لله ولا اله ...,male,33,ar-EG,serious,clean,9.333000
52915,/content/drive/My Drive/speech_finalproject/da...,جزاك الله خير,male,33,ar-EG,serious,clean,6.666000


In [None]:
del df

# Add Headers at The Beginning or at the End of Collecting Only

In [None]:
# file_path = os.path.join(output_drive_path,'dataset.csv')
# headers = ["Audio", "Transcript", "Gender", "Age", "Accent", "Mood", "Environment", "Duration"]
# # Read the CSV and assign the header
# df = pd.read_csv(file_path, header=None, names=headers)

# # Save the updated CSV
# df.to_csv(file_path, index=False)
# print("Header added successfully.")
# df

In [None]:
# drive_path = '/content/drive/My Drive/speech_finalproject/Playlist_2'

In [None]:
# for dir in os.listdir(drive_path):
#   if not dir.endswith('.csv'):
#     for file_name in os.listdir(os.path.join(drive_path, dir)):
#       if file_name.endswith('.wav'):
#         chunk_filename = os.path.join(drive_path, dir, file_name)
#         chunk = AudioSegment.from_wav(chunk_filename)
#         output_filename = os.path.join(output_path, dir, file_name)
#         os.makedirs(os.path.dirname(output_filename), exist_ok=True)
#         chunk.export(output_filename, format="wav")
#         print(f"Exported: {output_filename}")

#         output_filename_2 = os.path.join(output_drive_path, dir, file_name)
#         os.makedirs(os.path.dirname(output_filename_2), exist_ok=True)
#         chunk.export(output_filename_2, format="wav")
#         print(f"Exported: {output_filename_2}")

#       if file_name.endswith('.txt'):
#         transcript_filename = os.path.join(drive_path, dir, file_name)
#         with open(transcript_filename, "r") as transcript_file:
#           transcript = transcript_file.read()
#         output_t_filename = os.path.join(output_path, dir, file_name)
#         os.makedirs(os.path.dirname(output_t_filename), exist_ok=True)
#         with open(output_t_filename, "w") as trans_file:
#           trans_file.write(transcript)
#           print(f"Transcript saved: {output_t_filename}")

#         output_t_filename_2 = os.path.join(output_drive_path, dir, file_name)
#         os.makedirs(os.path.dirname(output_t_filename_2), exist_ok=True)
#         with open(output_t_filename_2, "w") as trans_file:
#           trans_file.write(transcript)
#           print(f"Transcript saved: {output_t_filename_2}")