In [1]:
import os
import librosa
import numpy as np
import random
import soundfile as sf
import sounddevice as sd

# Define paths
audio1_path = r"tuplu.wav" # Path to tuplu audio file
audio2_path = r"tzaca.wav"  # Path to tcaza audio file
output_folder = "overlapped_dataset"
os.makedirs(output_folder, exist_ok=True)

# Set a common sampling rate
common_sr = 16000


In [2]:
import numpy as np
import librosa as lb
import soundfile as sf

# File paths
audio1_path = r"tuplu.wav"
audio2_path = r"tzaca.wav"
common_sr = 16000  # Define a common sample rate

# Function to calculate channel energy
def calculate_energy(channel_data):
    return np.sum(np.square(channel_data))

# Function to determine the dominant channel
def select_dominant_channel(audio):
    if audio.ndim != 2:
        raise ValueError("The audio does not have two channels.")
    left_channel = audio[0]
    right_channel = audio[1]
    return left_channel if calculate_energy(left_channel) > calculate_energy(right_channel) else right_channel

# Load both audio files with the same sampling rate
audio1, _ = lb.load(audio1_path, sr=common_sr, mono=False)
audio2, _ = lb.load(audio2_path, sr=common_sr, mono=False)

# Check if the two audios have two channels
if audio1.ndim != 2:
    raise ValueError("The audio1 does not have two channels.")
if audio2.ndim != 2:
    raise ValueError("The audio2 does not have two channels.")

# Dynamically select the dominant channel for both audio files
selected_channel_audio1 = select_dominant_channel(audio1)
selected_channel_audio2 = select_dominant_channel(audio2)

# Save the selected channels as separate files
output_path1 = "selected_channel_audio1.wav"
sf.write(output_path1, selected_channel_audio1, common_sr)

output_path2 = "selected_channel_audio2.wav"
sf.write(output_path2, selected_channel_audio2, common_sr)

# Reload the selected channels for further processing
final_audio1, _ = lb.load(output_path1, sr=common_sr, mono=False)
final_audio2, _ = lb.load(output_path2, sr=common_sr, mono=False)


In [3]:
import numpy as np
import librosa as lb

# File paths
filePath = 'selected_channel_audio1.wav'
speech_times_file = "speech_intervals_a1.txt"
non_speech_times_file = "non_speech_intervals_a1.txt"

# Parameters
MIN_SILENCE_DURATION = 0.5  # Minimum silence duration to split segments (in seconds)
MIN_SPEECH_DURATION = 0.3   # Minimum duration to consider as speech (in seconds)

# Load the audio file
y, sr = lb.load(filePath, sr=None)
winSize = int(np.ceil(30e-3 * sr))  # 30 ms in samples
hopLength = int(0.5 * sr)  # Reduced hop length to 0.5 seconds for better resolution
    
# Frame the signal
sigFrames = lb.util.frame(y, frame_length=winSize, hop_length=hopLength)

# Compute short-term energy (STE)
sigSTE = np.sum(np.square(sigFrames), axis=0)

# Apply moving average smoothing to energy
window_size = 5  # Number of frames to average
smoothed_STE = np.convolve(sigSTE, np.ones(window_size)/window_size, mode='same')

# Compute adaptive threshold using percentile instead of mean
energy_threshold = np.percentile(smoothed_STE, 40)  # Adjust percentile as needed
is_speech = smoothed_STE > energy_threshold

# Convert frame indices to time
frame_times = lb.frames_to_time(np.arange(len(sigSTE)), sr=sr, hop_length=hopLength)

def get_initial_intervals(is_speech, frame_times):
    """Get initial speech intervals without any processing."""
    intervals = []
    start_time = None
    
    for i, speech in enumerate(is_speech):
        if speech and start_time is None:
            start_time = frame_times[i]
        elif not speech and start_time is not None:
            end_time = frame_times[i]
            intervals.append((start_time, end_time))
            start_time = None
    
    # Handle last segment
    if start_time is not None:
        intervals.append((start_time, frame_times[-1]))
    
    return intervals

def merge_intervals(intervals, max_gap=0.001):
    """Merge intervals that are consecutive or very close."""
    if not intervals:
        return []
    
    # Sort intervals by start time
    sorted_intervals = sorted(intervals, key=lambda x: x[0])
    merged = []
    current_start, current_end = sorted_intervals[0]
    
    for start, end in sorted_intervals[1:]:
        if start - current_end <= max_gap:
            # Merge intervals
            current_end = end
        else:
            # Add the current interval and start a new one
            merged.append((current_start, current_end))
            current_start, current_end = start, end
    
    merged.append((current_start, current_end))
    return merged

def process_intervals(is_speech, frame_times):
    """Process speech intervals with merging but no splitting."""
    # Get initial intervals
    intervals = get_initial_intervals(is_speech, frame_times)
    
    # Merge consecutive intervals
    intervals = merge_intervals(intervals)
    
    # Filter out short segments
    intervals = [(start, end) for start, end in intervals 
                if end - start >= MIN_SPEECH_DURATION]
    
    return intervals

def get_non_speech_intervals(speech_intervals, total_duration):
    """Generate non-speech intervals from gaps between speech intervals."""
    non_speech = []
    current_time = 0
    
    for start, end in sorted(speech_intervals, key=lambda x: x[0]):
        if start - current_time >= MIN_SILENCE_DURATION:
            non_speech.append((current_time, start))
        current_time = end
    
    # Add final non-speech segment if needed
    if total_duration - current_time >= MIN_SILENCE_DURATION:
        non_speech.append((current_time, total_duration))
    
    return non_speech

# Process the intervals
speech_intervals_a1 = process_intervals(is_speech, frame_times)
total_duration = len(y) / sr
non_speech_intervals_a1 = get_non_speech_intervals(speech_intervals_a1, total_duration)

# Save intervals to text files
def save_intervals(intervals, output_file):
    with open(output_file, "w") as f:
        for start, end in sorted(intervals, key=lambda x: x[0]):
            f.write(f"{start:.3f},{end:.3f}\n")

save_intervals(speech_intervals_a1, speech_times_file)
save_intervals(non_speech_intervals_a1, non_speech_times_file)
print(f"Speech intervals saved to {speech_times_file}")
print(f"Non-speech intervals saved to {non_speech_times_file}")

Speech intervals saved to speech_intervals_a1.txt
Non-speech intervals saved to non_speech_intervals_a1.txt


In [4]:
import numpy as np
import librosa as lb

# File paths
filePath = 'selected_channel_audio2.wav'
speech_times_file = "speech_intervals_a2.txt"
non_speech_times_file = "non_speech_intervals_a2.txt"

# Parameters
MIN_SILENCE_DURATION = 0.5  # Minimum silence duration to split segments (in seconds)
MIN_SPEECH_DURATION = 0.3   # Minimum duration to consider as speech (in seconds)

# Load the audio file
y, sr = lb.load(filePath, sr=None)
winSize = int(np.ceil(30e-3 * sr))  # 30 ms in samples
hopLength = int(0.5 * sr)  # Reduced hop length to 0.5 seconds for better resolution
    
# Frame the signal
sigFrames = lb.util.frame(y, frame_length=winSize, hop_length=hopLength)

# Compute short-term energy (STE)
sigSTE = np.sum(np.square(sigFrames), axis=0)

# Apply moving average smoothing to energy
window_size = 5  # Number of frames to average
smoothed_STE = np.convolve(sigSTE, np.ones(window_size)/window_size, mode='same')

# Compute adaptive threshold using percentile instead of mean
energy_threshold = np.percentile(smoothed_STE, 40)  # Adjust percentile as needed
is_speech = smoothed_STE > energy_threshold

# Convert frame indices to time
frame_times = lb.frames_to_time(np.arange(len(sigSTE)), sr=sr, hop_length=hopLength)

def get_initial_intervals(is_speech, frame_times):
    """Get initial speech intervals without any processing."""
    intervals = []
    start_time = None
    
    for i, speech in enumerate(is_speech):
        if speech and start_time is None:
            start_time = frame_times[i]
        elif not speech and start_time is not None:
            end_time = frame_times[i]
            intervals.append((start_time, end_time))
            start_time = None
    
    # Handle last segment
    if start_time is not None:
        intervals.append((start_time, frame_times[-1]))
    
    return intervals

def merge_intervals(intervals, max_gap=0.001):
    """Merge intervals that are consecutive or very close."""
    if not intervals:
        return []
    
    # Sort intervals by start time
    sorted_intervals = sorted(intervals, key=lambda x: x[0])
    merged = []
    current_start, current_end = sorted_intervals[0]
    
    for start, end in sorted_intervals[1:]:
        if start - current_end <= max_gap:
            # Merge intervals
            current_end = end
        else:
            # Add the current interval and start a new one
            merged.append((current_start, current_end))
            current_start, current_end = start, end
    
    merged.append((current_start, current_end))
    return merged

def process_intervals(is_speech, frame_times):
    """Process speech intervals with merging but no splitting."""
    # Get initial intervals
    intervals = get_initial_intervals(is_speech, frame_times)
    
    # Merge consecutive intervals
    intervals = merge_intervals(intervals)
    
    # Filter out short segments
    intervals = [(start, end) for start, end in intervals 
                if end - start >= MIN_SPEECH_DURATION]
    
    return intervals

def get_non_speech_intervals(speech_intervals, total_duration):
    """Generate non-speech intervals from gaps between speech intervals."""
    non_speech = []
    current_time = 0
    
    for start, end in sorted(speech_intervals, key=lambda x: x[0]):
        if start - current_time >= MIN_SILENCE_DURATION:
            non_speech.append((current_time, start))
        current_time = end
    
    # Add final non-speech segment if needed
    if total_duration - current_time >= MIN_SILENCE_DURATION:
        non_speech.append((current_time, total_duration))
    
    return non_speech

# Process the intervals
speech_intervals_a2 = process_intervals(is_speech, frame_times)
total_duration = len(y) / sr
non_speech_intervals_a2 = get_non_speech_intervals(speech_intervals_a1, total_duration)

# Save intervals to text files
def save_intervals(intervals, output_file):
    with open(output_file, "w") as f:
        for start, end in sorted(intervals, key=lambda x: x[0]):
            f.write(f"{start:.3f},{end:.3f}\n")

save_intervals(speech_intervals_a2, speech_times_file)
save_intervals(non_speech_intervals_a2, non_speech_times_file)
print(f"Speech intervals saved to {speech_times_file}")
print(f"Non-speech intervals saved to {non_speech_times_file}")

Speech intervals saved to speech_intervals_a2.txt
Non-speech intervals saved to non_speech_intervals_a2.txt


In [6]:
import numpy as np
import librosa as lb
import soundfile as sf
import random
import os

# File paths
file1_path = "selected_channel_audio1.wav"
file2_path = "selected_channel_audio2.wav"
speech_intervals_a1_file = "speech_intervals_a1.txt"
speech_intervals_a2_file = "speech_intervals_a2.txt"
output_folder = "datasets"
info_file = os.path.join(output_folder, "datasetinfo.txt")

# Create output folder if it doesn't exist
os.makedirs(output_folder, exist_ok=True)

# Function to load intervals from a text file
def load_intervals(file_path):
    intervals = []
    with open(file_path, "r") as f:
        for line in f:
            start, end = map(float, line.strip().split(","))
            intervals.append((start, end))
    return intervals

# Function to extract audio based on start and end times
def extract_audio(y, sr, start, end):
    start_sample = int(start * sr)
    end_sample = int(end * sr)
    return y[start_sample:end_sample]

# Function to overlay two audio signals
def overlay_audio(base_audio, overlay_audio, overlay_position, sr):
    overlay_samples = len(overlay_audio)
    position_sample = int(overlay_position * sr)
    end_sample = position_sample + overlay_samples

    # Ensure the overlay fits within the base audio
    if end_sample > len(base_audio):
        end_sample = len(base_audio)
        overlay_samples = end_sample - position_sample
        overlay_audio = overlay_audio[:overlay_samples]

    base_audio[position_sample:end_sample] += overlay_audio
    return base_audio

# Function to generate "no" intervals from "yes" intervals
def generate_no_intervals(yes_intervals, total_duration):
    no_intervals = []
    start = 0.0

    # Sort "yes" intervals to process them in order
    yes_intervals = sorted(yes_intervals)

    for yes_start, yes_end in yes_intervals:
        if start < yes_start:
            no_intervals.append((start, yes_start))
        start = max(start, yes_end)  # Ensure no overlap

    if start < total_duration:
        no_intervals.append((start, total_duration))

    return no_intervals

# Function to extract a continuous 45-second segment from speech intervals with a random start point
def extract_continuous_speech(y, sr, speech_intervals, target_duration=45.0):
    total_duration = 0.0
    extracted_audio = []

    # Shuffle intervals to avoid using the same intervals in a fixed order
    random.shuffle(speech_intervals)

    # Iterate through the speech intervals and select random intervals to accumulate a 45-second segment
    for interval in speech_intervals:
        start, end = interval
        # Add the segment if the total duration is less than the target_duration
        if total_duration + (end - start) <= target_duration:
            extracted_audio.append(extract_audio(y, sr, start, end))
            total_duration += (end - start)
        if total_duration >= target_duration:
            break

    # Concatenate the audio segments to form a continuous 45-second audio
    return np.concatenate(extracted_audio)

# Load audio files and speech intervals
y1, sr1 = lb.load(file1_path, sr=None)
y2, sr2 = lb.load(file2_path, sr=None)
speech_intervals_a1 = load_intervals(speech_intervals_a1_file)
speech_intervals_a2 = load_intervals(speech_intervals_a2_file)

# Generate 100 samples of overlapping audio
with open(info_file, "w") as info:
    info.write("SampleNumber,Start,End,Label\n")
    for i in range(100):
        try:
            # Select a continuous 45-second segment from speech intervals in file1
            base_audio = extract_continuous_speech(y1, sr1, speech_intervals_a1)

            # Select two random intervals for overlay audio of 2-5 seconds each
            overlay_intervals = []
            while len(overlay_intervals) < 2:
                selected_interval = random.choice(speech_intervals_a2)
                segment_duration = random.uniform(2.0, 5.0)
                start_point = random.uniform(
                    selected_interval[0],
                    max(selected_interval[1] - segment_duration, selected_interval[0])
                )
                overlay_intervals.append((start_point, start_point + segment_duration))

            # Extract overlay audio
            overlay_audio1 = extract_audio(y2, sr2, overlay_intervals[0][0], overlay_intervals[0][1])
            overlay_audio2 = extract_audio(y2, sr2, overlay_intervals[1][0], overlay_intervals[1][1])

            # Randomly position the first overlay audio on the base audio
            overlay_position1 = random.uniform(0, 45 - len(overlay_audio1) / sr1)

            # Randomly position the second overlay audio on the base audio
            overlay_position2 = random.uniform(0, 45 - len(overlay_audio2) / sr1)

            # Overlay the audio segments
            base_audio = overlay_audio(base_audio, overlay_audio1, overlay_position1, sr1)
            base_audio = overlay_audio(base_audio, overlay_audio2, overlay_position2, sr1)

            # Calculate "yes" intervals relative to the 45-second base audio
            yes_intervals = [
                (overlay_position1, overlay_position1 + len(overlay_audio1) / sr1),
                (overlay_position2, overlay_position2 + len(overlay_audio2) / sr1)
            ]

            # Generate "no" intervals
            no_intervals = generate_no_intervals(yes_intervals, 45.0)

            # Save the audio file
            output_path = os.path.join(output_folder, f"sample_{i + 1}.wav")
            sf.write(output_path, base_audio, sr1)

            # Combine "no" and "yes" intervals, tagging them with labels
            labeled_intervals = [(start, end, "no") for start, end in no_intervals] + \
                                [(start, end, "yes") for start, end in yes_intervals]

            # Sort all intervals by their start time
            labeled_intervals = sorted(labeled_intervals, key=lambda x: x[0])

            # Save timing and labels to the dataset info file in sorted order
            for start, end, label in labeled_intervals:
                info.write(f"{i + 1},{start:.3f},{end:.3f},{label}\n")

        except ValueError as e:
            print(f"Skipping sample {i + 1} due to insufficient intervals: {e}")

print(f"100 overlapping samples generated and timing info saved in {info_file}.")


100 overlapping samples generated and timing info saved in datasets\datasetinfo.txt.
