# Import All the Necessary Libraries

In [2]:
import os
import librosa
import soundfile as sf
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler, StandardScaler
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Conv2D, Conv2DTranspose, ReLU, BatchNormalization, Bidirectional, LSTM, TimeDistributed, Reshape, Input
from tensorflow.keras.models import Model



# Convert to Low Frequency

In [None]:
# Input and output paths
input_folder = r'/kaggle/input/librispeech-clean/LibriSpeech/test-clean'
output_folder = r'/kaggle/working/lowfreq'

# List of target sampling rates (e.g., 1000 Hz, 2000 Hz, 4000 Hz, 8000 Hz)
target_sample_rates = [2000, 4000, 8000]
  -
# 0Walk through all files in the input folder
for0. 
root, _, files in os.walk(input_folder):
    for file in files:
        if file.endswith('.flac'):
            # Construct full file path
            file_path = os.path.join(root, file)

            # Load the audio file
            audio_data, original_sr = librosa.load(file_path, sr=None)

            # Iterate through each target sampling rate
            for target_sr in target_sample_rates:
                # Resample the audio to the target sampling rate
                resampled_audio = librosa.resample(audio_data, orig_sr=original_sr, target_sr=target_sr)

                # Create the output file path, preserving the folder structure
                relative_path = os.path.relpath(root, input_folder)
                output_path = os.path.join(output_folder, f"{target_sr}Hz", relative_path)
                os.makedirs(output_path, exist_ok=True)

                # Save the resampled audio as a .wav file
                output_file = os.path.join(output_path, os.path.splitext(file)[0] + f'_{target_sr}Hz.wav')
                sf.write(output_file, resampled_audio, target_sr)

                # Print progress update (carriage return)
                print(f"\rResampled audio saved at {output_file}", end='')

print("\nAll low-frequency audio conversion completed successfully.")



KeyboardInterrupt



# Extract Both High and Low-Resolution Features

In [None]:
# Input and output paths for low-frequency feature extraction
input_folder_lr = r'/kaggle/working/lowfreq'
output_folder_lr = r'/kaggle/working/features/low_res'

# Create the output folder if it doesn't exist
os.makedirs(output_folder_lr, exist_ok=True)

# Function to calculate and save Mel-spectrogram
def save_mel_spectrogram(audio_data, sr, output_file):
    mel_spec = librosa.feature.melspectrogram(y=audio_data, sr=sr, n_mels=128)
    mel_spec_db = librosa.power_to_db(mel_spec, ref=np.max)
    np.save(output_file, mel_spec_db)

    plt.figure(figsize=(10, 4))
    librosa.display.specshow(mel_spec_db, sr=sr, x_axis='time', y_axis='mel')
    plt.colorbar(format='%+2.0f dB')
    plt.savefig(output_file.replace('.npy', '.png'))
    plt.close()

# Function to calculate and save MFCCs
def save_mfcc(audio_data, sr, output_file):
    mfccs = librosa.feature.mfcc(y=audio_data, sr=sr, n_mfcc=13)
    np.save(output_file, mfccs)

    plt.figure(figsize=(10, 4))
    librosa.display.specshow(mfccs, sr=sr, x_axis='time')
    plt.colorbar()
    plt.savefig(output_file.replace('.npy', '.png'))
    plt.close()

# Walk through low-frequency files to extract features
for root, _, files in os.walk(input_folder_lr):
    for file in files:
        if file.endswith('.wav'):
            file_path = os.path.join(root, file)
            audio_data, sr = librosa.load(file_path, sr=None)

            # Choose either Mel-spectrogram or MFCC
            use_mfcc = True  # Change to False if you want to use Mel-spectrograms

            output_file = os.path.join(output_folder_lr, os.path.splitext(file)[0] + '.npy')
            if use_mfcc:
                save_mfcc(audio_data, sr, output_file)
            else:
                save_mel_spectrogram(audio_data, sr, output_file)

            # Print progress update (carriage return)
            print(f"\rExtracted {'MFCC' if use_mfcc else 'Mel-spectrogram'} for low-res {file}", end='')

print("\nAll low-resolution feature extraction completed successfully.")

# Input and output paths for high-resolution feature extraction
input_folder_hr = r'/kaggle/input/librispeech-clean/LibriSpeech/test-clean'  # Original dataset path
output_folder_hr = r'/kaggle/working/features/high_res'

# Create the output folder if it doesn't exist
os.makedirs(output_folder_hr, exist_ok=True)

# Walk through all files in the original input folder for high-resolution feature extraction
for root, _, files in os.walk(input_folder_hr):
    for file in files:
        if file.endswith('.flac'):
            file_path = os.path.join(root, file)
            audio_data, sr = librosa.load(file_path, sr=None)

            output_file = os.path.join(output_folder_hr, os.path.splitext(file)[0] + '.npy')
            if use_mfcc:
                save_mfcc(audio_data, sr, output_file)
            else:
                save_mel_spectrogram(audio_data, sr, output_file)

            # Print progress update (carriage return)
            print(f"\rExtracted {'MFCC' if use_mfcc else 'Mel-spectrogram'} for high-res {file}", end='')

print("\nAll high-resolution feature extraction completed successfully.")


#  Normalize and Standardize the Features

In [None]:
import os
import numpy as np
from sklearn.preprocessing import MinMaxScaler, StandardScaler

# Input and output paths for normalized features
input_folder_lr = r'/kaggle/working/features/low_res'  # Low-resolution features path
input_folder_hr = r'/kaggle/working/features/high_res'  # High-resolution features path
output_folder_lr = r'/kaggle/working/normalized_features/low_res'  # Normalized low-res features output path
output_folder_hr = r'/kaggle/working/normalized_features/high_res'  # Normalized high-res features output path

# Create the output folders if they don't exist
os.makedirs(output_folder_lr, exist_ok=True)
os.makedirs(output_folder_hr, exist_ok=True)

# Function to normalize the feature data to range [0, 1]
def normalize_features(features):
    scaler = MinMaxScaler()
    return scaler.fit_transform(features)

# Function to standardize the feature data (mean=0, std=1)
def standardize_features(features):
    scaler = StandardScaler()
    return scaler.fit_transform(features)

# Choose whether to normalize or standardize
use_standardization = True  # Set to False if you want normalization instead

# List of input folders and their corresponding output folders
folders = [
    (input_folder_lr, output_folder_lr),
    (input_folder_hr, output_folder_hr)
]

# Walk through all .npy files in the input folders
for input_folder, output_folder in folders:
    for root, _, files in os.walk(input_folder):
        for file in files:
            if file.endswith('.npy'):
                file_path = os.path.join(root, file)
                features = np.load(file_path)

                # Reshape the feature array if necessary (flatten if it's 2D or 3D)
                original_shape = features.shape
                if len(features.shape) > 1:
                    features = features.reshape(-1, original_shape[-1])

                # Normalize or standardize the features
                if use_standardization:
                    processed_features = standardize_features(features)
                else:
                    processed_features = normalize_features(features)

                # Reshape back to original shape if necessary
                processed_features = processed_features.reshape(original_shape)

                # Create the output file path, preserving the folder structure
                relative_path = os.path.relpath(root, input_folder)
                output_path = os.path.join(output_folder, relative_path)
                os.makedirs(output_path, exist_ok=True)

                # Save the normalized or standardized features as a .npy file
                output_file = os.path.join(output_path, file)
                np.save(output_file, processed_features)

                # Print progress update (carriage return)
                print(f"\rProcessed features for {file}", end='')

print("\nAll feature normalization/standardization completed successfully.")


# Create Train Dataset for Super-Resolution Model

In [None]:
# Paths to folders containing extracted low-res and high-res features
low_res_features_path = r'/kaggle/working/normalized_features/low_res'  # Replace with actual path
high_res_features_path = r'/kaggle/working/normalized_features/high_res'  # Replace with actual path

# Load features
def load_feature(file_path):
    # Assuming the features are saved as numpy arrays (.npy files)
    return np.load(file_path)

# Get list of all feature files (ensure low-res and high-res are in matching order)
low_res_files = sorted([f'{low_res_features_path}/{file}' for file in os.listdir(low_res_features_path)])
high_res_files = sorted([f'{high_res_features_path}/{file}' for file in os.listdir(high_res_features_path)])

# Function to load and pair low-res and high-res features
def load_data(low_res_file, high_res_file):
    low_res_mfcc = load_feature(low_res_file)
    high_res_mfcc = load_feature(high_res_file)
    return low_res_mfcc, high_res_mfcc

# Create a TensorFlow dataset from the low-res and high-res feature pairs
def create_dataset(low_res_files, high_res_files, batch_size=16):
    dataset = tf.data.Dataset.from_tensor_slices((low_res_files, high_res_files))

    dataset = dataset.map(lambda low_res_file, high_res_file: tf.py_function(
        func=lambda low_res_file, high_res_file: load_data(low_res_file.numpy(), high_res_file.numpy()), 
        inp=[low_res_file, high_res_file], 
        Tout=(tf.float32, tf.float32)),
        num_parallel_calls=tf.data.experimental.AUTOTUNE)

    dataset = dataset.batch(batch_size)
    dataset = dataset.prefetch(tf.data.experimental.AUTOTUNE)

    return dataset

# Create the training dataset
train_dataset = create_dataset(low_res_files, high_res_files, batch_size=16)

sample_feature = np.load(low_res_files[0])  # Load a sample feature
print(sample_feature.shape)  # Print the shape of the feature


# Design the Super-Resolution Model

In [None]:
# CNN + RNN (LSTM) Super-Resolution Network
def build_cnn_rnn_super_resolution(input_shape):
    inputs = Input(shape=input_shape)  # Input shape will be (time_steps, frequency_bins, channels)
    
    # CNN Layers
    x = Conv2D(64, kernel_size=(3, 3), strides=(1, 1), padding='same')(inputs)
    x = BatchNormalization()(x)
    x = ReLU()(x)

    x = Conv2D(128, kernel_size=(3, 3), strides=(1, 1), padding='same')(x)
    x = BatchNormalization()(x)
    x = ReLU()(x)

    x = Conv2D(256, kernel_size=(3, 3), strides=(1, 1), padding='same')(x)
    x = BatchNormalization()(x)
    x = ReLU()(x)

    # Upsampling layers
    x = Conv2DTranspose(128, kernel_size=(3, 3), strides=(2, 2), padding='same')(x)
    x = ReLU()(x)

    x = Conv2DTranspose(64, kernel_size=(3, 3), strides=(2, 2), padding='same')(x)
    x = ReLU()(x)

    # Reshape for RNN input
    x = Reshape((x.shape[1], x.shape[2] * x.shape[3]))(x)

    # Add Bidirectional LSTM for temporal modeling
    x = Bidirectional(LSTM(128, return_sequences=True))(x)

    # Reshape back to CNN's dimensions
    x = Reshape((input_shape[0], input_shape[1], 1))(x)

    # Final output layer
    outputs = Conv2D(1, kernel_size=(3, 3), strides=(1, 1), padding='same', activation='linear')(x)

    # Build the model
    model = Model(inputs, outputs)
    return model

# Example input shape (time_steps, frequency_bins, channels)
input_shape = (128, 64, 1)  # Adjust based on your feature size

# Build the model
model = build_cnn_rnn_super_resolution(input_shape)

# Compile the model with MSE loss and Adam optimizer
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4), loss='mse')

# Model summary to check the architecture
model.summary()


# Train the Model

In [None]:
# Train the model
history = model.fit(
    train_dataset,   # Low-res and high-res training pairs (low_res_mfcc, high_res_mfcc)
    epochs=100,      # Adjust the number of epochs as needed
    validation_data=val_dataset,  # You need to create a validation dataset similarly
    batch_size=16,   # Adjust batch size based on your hardware
    callbacks=[tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)]  # Early stopping for efficiency
)
