***Loading Audio Data***

In [2]:
import zipfile
import os
import librosa

# Path to the ZIP file
zip_file_path = "E:\\fma_small.zip"

# Directory to extract files
output_dir = "D:\\audio_sample"

# Function to get total size of extracted files in GB
def get_extracted_size(output_dir):
    total_size = sum(os.path.getsize(os.path.join(root, file)) for root, _, files in os.walk(output_dir) for file in files)
    total_size_gb = total_size / (1024 ** 3)  # Convert bytes to GB
    return total_size_gb

# Function to extract all files from a ZIP archive and load audio files in chunks of approximately 5 GB each
def extract_all_and_load_in_chunks(zip_file, output_dir, target_chunk_size_gb=5):
    audio_files = []
    with zipfile.ZipFile(zip_file, 'r') as zip_ref:
        # Get a list of all files in the archive
        file_list = zip_ref.namelist()

        # Calculate total size of the archive
        total_size_gb = sum(zip_ref.getinfo(file).file_size for file in file_list) / (1024 ** 3)

        # Calculate number of chunks based on target chunk size
        num_chunks = int(total_size_gb / target_chunk_size_gb) + 1

        # Extract and load data in chunks
        for i in range(num_chunks):
            start_index = i * len(file_list) // num_chunks
            end_index = (i + 1) * len(file_list) // num_chunks
            
            # Print the current chunk being processed
            print(f"Processing chunk {i + 1}/{num_chunks}")

            # Extract and load audio files in the current chunk
            for file in file_list[start_index:end_index]:
                zip_ref.extract(file, path=output_dir)

                # Check if the extracted file is an audio file
                if file.endswith('.mp3'):  # Adjust file extension as needed
                    file_path = os.path.join(output_dir, file)
                    # Load audio file using librosa
                    try:
                        y, sr = librosa.load(file_path, sr=None, mono=True)
                        audio_files.append((file_path, y, sr))
                    except Exception as e:
                        print(f"Error loading audio file '{file_path}': {str(e)}")

    return audio_files

# Create output directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)

# Extract all files from the ZIP archive and load them in chunks of approximately 5 GB each
audio_files = extract_all_and_load_in_chunks(zip_file_path, output_dir)

# Now you have a list of tuples containing file paths, audio data, and sample rates
# Each tuple represents one audio file in the sample


Processing chunk 1/2


In [10]:
import os
import librosa
import IPython.display as ipd

# Directory containing the audio files
AUDIO_DIR = "D:\\audio_sample"

# Function to get the path of an audio file
def get_audio_paths(dir_path):
    audio_paths = []
    for root, _, files in os.walk(dir_path):
        for file in files:
            if file.endswith('.mp3'):  # Adjust file extension as needed
                audio_paths.append(os.path.join(root, file))
    return audio_paths

# Example: Load and listen to audio files
for audio_file in get_audio_paths(AUDIO_DIR):
    print('File: {}'.format(audio_file))

    try:
        # Load audio file using Librosa
        x, sr = librosa.load(audio_file, sr=None, mono=True)

        # Get information about the loaded audio data
        duration_sec = librosa.get_duration(y=x, sr=sr)
        num_samples = len(x)

        print('Duration: {:.2f}s, {} samples'.format(duration_sec, num_samples))

        # Set the start and end times for playback
        start_sec, end_sec = 7, 17  # Adjust start and end times as needed

        # Listen to the audio within the specified time range
        ipd.Audio(data=x[int(start_sec*sr):int(end_sec*sr)], rate=sr)
    except Exception as e:
        print(f'Error processing {audio_file}: {str(e)}')
        continue  # Skip processing this file and move to the next one


File: D:\audio_sample\fma_small\000\000002.mp3
Duration: 29.98s, 1321967 samples
File: D:\audio_sample\fma_small\000\000005.mp3
Duration: 30.00s, 1323119 samples
File: D:\audio_sample\fma_small\000\000010.mp3
Duration: 29.98s, 1321967 samples
File: D:\audio_sample\fma_small\000\000140.mp3
Duration: 29.98s, 1321967 samples
File: D:\audio_sample\fma_small\000\000141.mp3
Duration: 29.98s, 1321967 samples
File: D:\audio_sample\fma_small\000\000148.mp3
Duration: 29.98s, 1321967 samples
File: D:\audio_sample\fma_small\000\000182.mp3
Duration: 30.00s, 1323119 samples
File: D:\audio_sample\fma_small\000\000190.mp3
Duration: 30.00s, 1323119 samples
File: D:\audio_sample\fma_small\000\000193.mp3
Duration: 29.98s, 1321967 samples
File: D:\audio_sample\fma_small\000\000194.mp3
Duration: 30.00s, 1323119 samples
File: D:\audio_sample\fma_small\000\000197.mp3
Duration: 29.98s, 1321967 samples
File: D:\audio_sample\fma_small\000\000200.mp3
Duration: 29.98s, 1321967 samples
File: D:\audio_sample\fma_sm

  x, sr = librosa.load(audio_file, sr=None, mono=True)


***Pre-Processing Audio Data***

In [1]:
import os
import numpy as np
import librosa
import soundfile as sf  # Import soundfile for WAV support

def get_audio_paths(directory):
    audio_paths = []  # List to store audio file paths
    for root, _, files in os.walk(directory):
        for file in files:
            if file.endswith(".mp3"):  # Adjust file extension as needed
                audio_paths.append(os.path.join(root, file))
    return audio_paths

def process_in_chunks(audio_data):
    yield audio_data, 0  # Yield the entire audio data as one chunk

def enhance_audio_chunk(chunk):
    try:
        # Apply audio effects to the chunk (e.g., equalization, noise reduction, compression)
        # For equalization and noise reduction, you can implement custom processing
        
        # Apply equalization (example: increase low frequencies)
        equalized_audio = chunk * np.array([1.2 if f < 500 else 1.0 for f in range(len(chunk))])
        
        # Apply noise reduction (example: simple moving average filter)
        window_size = 100  # Adjust window size as needed
        
        if len(equalized_audio) < window_size:
            # If audio data is too short, skip noise reduction
            noise_reduced_audio = equalized_audio
        else:
            noise_reduced_audio = np.convolve(equalized_audio, np.ones(window_size) / window_size, mode='same')
        
        # Apply dynamic range compression
        threshold = 0.2  # Adjust threshold as needed
        compression_ratio = 2  # Adjust compression ratio as needed
        compressed_audio = compress_signal(noise_reduced_audio, threshold, compression_ratio)
        
        # Apply normalization
        normalized_audio = librosa.util.normalize(compressed_audio)
        
        enhanced_audio_chunk = normalized_audio
        
        return enhanced_audio_chunk
    except Exception as e:
        print("Error in enhance_audio_chunk:", e)
        return chunk

def compress_signal(signal, threshold, ratio):
    """
    Apply dynamic range compression to the input signal.

    Args:
    - signal (ndarray): Input audio signal.
    - threshold (float): Threshold above which compression starts.
    - ratio (float): Compression ratio.

    Returns:
    - compressed_signal (ndarray): Compressed audio signal.
    """
    # Compute the mask for compression
    mask = np.where(np.abs(signal) > threshold, 1, 0)
    # Apply compression to the masked signal
    compressed_signal = signal * (1 - mask) + mask * (signal - threshold) / ratio
    return compressed_signal

def adjust_energy_level(chunk, min_energy, max_energy):
    """
    Adjust the energy level of the audio chunk to be within an acceptable range.

    Args:
    - chunk (ndarray): Input audio chunk.
    - min_energy (float): Minimum acceptable energy level.
    - max_energy (float): Maximum acceptable energy level.

    Returns:
    - adjusted_chunk (ndarray): Adjusted audio chunk.
    """
    try:
        # Calculate the energy of the audio chunk
        energy = np.sum(np.abs(chunk) ** 2)
        
        # Perform energy level adjustment if necessary
        if energy < min_energy:
            # Increase the volume to bring the energy level up to the minimum
            adjustment_factor = np.sqrt(min_energy / energy)
            adjusted_chunk = chunk * adjustment_factor
        elif energy > max_energy:
            # Decrease the volume to bring the energy level down to the maximum
            adjustment_factor = np.sqrt(max_energy / energy)
            adjusted_chunk = chunk * adjustment_factor
        else:
            # Energy level is within the acceptable range, no adjustment needed
            adjusted_chunk = chunk
        
        return adjusted_chunk
    except Exception as e:
        print("Error in adjust_energy_level:", e)
        return chunk

# Constants
AUDIO_DIR = "D:\\audio_sample"
OUTPUT_DIR = "D:\\processed_audio_wav"  # Adjust the output directory as needed
os.makedirs(OUTPUT_DIR, exist_ok=True)  # Ensure the output directory exists
MIN_ENERGY = 1e6  # Adjust the minimum energy threshold as needed
MAX_ENERGY = 1e10  # Adjust the maximum energy threshold as needed

# Get audio paths
audio_paths = get_audio_paths(AUDIO_DIR)

# Process audio data in chunks
for folder_path in audio_paths:
    try:
        audio_data, sr = librosa.load(folder_path, sr=None, mono=True)
        
        # Create subfolder structure in the output directory
        relative_path = os.path.relpath(folder_path, AUDIO_DIR)
        output_folder = os.path.join(OUTPUT_DIR, os.path.dirname(relative_path))
        os.makedirs(output_folder, exist_ok=True)
        
        print("Pre-processing folder:", os.path.dirname(relative_path))
        
        for chunk, chunk_index in process_in_chunks(audio_data):
           # print("Pre- Processing f:", os.path.basename(folder_path))
            
            # Apply audio enhancement to the chunk
            enhanced_chunk = enhance_audio_chunk(chunk)
           # print("Enhancing audio for folder:", os.path.basename(folder_path))
            
            # Adjust energy level if necessary
            adjusted_chunk = adjust_energy_level(enhanced_chunk, MIN_ENERGY, MAX_ENERGY)
            
            # Save enhanced audio chunk as WAV file in the subfolder
            output_wav_path = os.path.join(output_folder, f"{os.path.basename(folder_path)}.wav")
            sf.write(output_wav_path, adjusted_chunk, sr)
            
            print("Saved enhanced audio for folder:", os.path.basename(folder_path), "in folder:", output_folder)
    except Exception as e:
        print("Error processing folder:", folder_path)
        print(e)


Pre-processing folder: fma_small\000
Saved enhanced audio for folder: 000002.mp3 in folder: D:\processed_audio_wav\fma_small\000
Error processing folder: D:\audio_sample\fma_small\000\000002_chunk_0.mp3

Pre-processing folder: fma_small\000


  audio_data, sr = librosa.load(folder_path, sr=None, mono=True)
	Deprecated as of librosa version 0.10.0.
	It will be removed in librosa version 1.0.
  y, sr_native = __audioread_load(path, offset, duration, dtype)


Saved enhanced audio for folder: 000005.mp3 in folder: D:\processed_audio_wav\fma_small\000
Pre-processing folder: fma_small\000
Saved enhanced audio for folder: 000010.mp3 in folder: D:\processed_audio_wav\fma_small\000
Pre-processing folder: fma_small\000
Saved enhanced audio for folder: 000140.mp3 in folder: D:\processed_audio_wav\fma_small\000
Pre-processing folder: fma_small\000
Saved enhanced audio for folder: 000141.mp3 in folder: D:\processed_audio_wav\fma_small\000
Pre-processing folder: fma_small\000
Saved enhanced audio for folder: 000148.mp3 in folder: D:\processed_audio_wav\fma_small\000
Pre-processing folder: fma_small\000
Saved enhanced audio for folder: 000182.mp3 in folder: D:\processed_audio_wav\fma_small\000
Pre-processing folder: fma_small\000
Saved enhanced audio for folder: 000190.mp3 in folder: D:\processed_audio_wav\fma_small\000
Pre-processing folder: fma_small\000
Saved enhanced audio for folder: 000193.mp3 in folder: D:\processed_audio_wav\fma_small\000
Pre-p

In [5]:
import os
import subprocess

def convert_to_mp3(input_path, output_folder):
    try:
        # Correct file extension if necessary
        if input_path.lower().endswith('.mp3.wav'):
            input_path = input_path[:-4]  # Remove the '.wav' part from the file name
        
        # Construct output path
        output_filename = os.path.splitext(os.path.basename(input_path))[0] + ".mp3"
        output_path = os.path.join(output_folder, output_filename)
        
        # Convert using ffmpeg
        subprocess.run(['ffmpeg', '-i', input_path, '-codec:a', 'libmp3lame', '-qscale:a', '2', output_path])
        
        print("Converted", input_path, "to MP3:", output_path)
    except Exception as e:
        print("Error converting:", input_path)
        print(e)

def convert_all_to_mp3(input_dir, output_dir):
    # Create output directory if it doesn't exist
    os.makedirs(output_dir, exist_ok=True)
    
    # Iterate through all subdirectories in the input directory
    for root, dirs, _ in os.walk(input_dir):
        for folder in dirs:
            # Construct input and output paths for the current subfolder
            subfolder_input_path = os.path.join(root, folder)
            subfolder_output_path = os.path.join(output_dir, os.path.basename(subfolder_input_path))
            os.makedirs(subfolder_output_path, exist_ok=True)
            
            # Iterate through all files in the current subfolder
            for file in os.listdir(subfolder_input_path):
                if file.lower().endswith('.wav'):
                    input_path = os.path.join(subfolder_input_path, file)
                    convert_to_mp3(input_path, subfolder_output_path)

# Constants
AUDIO_DIR = "D:\\processed_audio_wav"  # Input directory containing audio files
MP3_OUTPUT_DIR = "D:\\mp3_processed_audio"  # Output directory for converted MP3 files

# Convert all audio files in each subdirectory of the input directory to MP3
convert_all_to_mp3(AUDIO_DIR, MP3_OUTPUT_DIR)


Error converting: D:\processed_audio_wav\fma_small\000\000002.mp3
[WinError 2] The system cannot find the file specified
Error converting: D:\processed_audio_wav\fma_small\000\000005.mp3
[WinError 2] The system cannot find the file specified
Error converting: D:\processed_audio_wav\fma_small\000\000010.mp3
[WinError 2] The system cannot find the file specified
Error converting: D:\processed_audio_wav\fma_small\000\000140.mp3
[WinError 2] The system cannot find the file specified
Error converting: D:\processed_audio_wav\fma_small\000\000141.mp3
[WinError 2] The system cannot find the file specified
Error converting: D:\processed_audio_wav\fma_small\000\000148.mp3
[WinError 2] The system cannot find the file specified
Error converting: D:\processed_audio_wav\fma_small\000\000182.mp3
[WinError 2] The system cannot find the file specified
Error converting: D:\processed_audio_wav\fma_small\000\000190.mp3
[WinError 2] The system cannot find the file specified
Error converting: D:\processed_a

In [7]:
import zipfile
import os

def extract_tracks_csv(zip_file_path, output_dir):
    # Create output directory if it doesn't exist
    os.makedirs(output_dir, exist_ok=True)
    
    with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
        for file_info in zip_ref.infolist():
            if file_info.filename.endswith('tracks.csv'):
                zip_ref.extract(file_info, output_dir)

# Example usage
ZIP_FILE_PATH = r"C:\Users\admin\Downloads\fma_metadata.zip"
OUTPUT_DIR = r"D:\track_metadata.csv"

extract_tracks_csv(ZIP_FILE_PATH, OUTPUT_DIR)


In [15]:
import pandas as pd

# Load track metadata with specified dtype to avoid mixed types warning
tracks = pd.read_csv(r"D:\track_metadata.csv\fma_metadata\tracks.csv", dtype=str)

# Flatten the hierarchy by selecting necessary columns
tracks_selected = tracks[['Unnamed: 0', 'comments', 'date_created', 'duration', 'favorites', 'genre_top', 'genres', 'title', 'album', 'artist', 'set']]

# Extract album metadata
albums = tracks_selected['album'].apply(lambda x: pd.Series(x.split('\t')))
albums.columns = [f'album_{i}' for i in range(1, len(albums.columns) + 1)]

# Extract artist metadata
artists = tracks_selected['artist'].apply(lambda x: pd.Series(x.split('\t')))
artists.columns = [f'artist_{i}' for i in range(1, len(artists.columns) + 1)]

# Extract set metadata
sets = tracks_selected['set'].apply(lambda x: pd.Series(x.split('\t')))
sets.columns = [f'set_{i}' for i in range(1, len(sets.columns) + 1)]

# Preprocess track metadata
# Drop original 'album', 'artist', and 'set' columns
tracks_selected.drop(['album', 'artist', 'set'], axis=1, inplace=True)

# Concatenate selected columns with extracted album, artist, and set metadata
tracks_preprocessed = pd.concat([tracks_selected, albums, artists, sets], axis=1)

# Convert date columns to datetime
tracks_preprocessed['date_created'] = pd.to_datetime(tracks_preprocessed['date_created'])

# Display the first few rows of preprocessed track metadata
print("\nPreprocessed Track Metadata:")
print(tracks_preprocessed.head())

# Display the shape of the preprocessed dataset
print("\nPreprocessed Tracks Dataset Shape:", tracks_preprocessed.shape)


KeyError: "['comments', 'date_created', 'duration', 'favorites', 'genre_top', 'genres', 'title'] not in index"