In [1]:
#import audio
import librosa

In [2]:
samples, sr = librosa.load('../tests/silence_10s.mp3')

In [3]:
import random
import numpy as np

# Chunk extraction

### Helper function: `wraparound_extract()`

In [4]:
def wraparound_extract(original, begin, length):
    '''
    Extracts elements from numpy.array in a "wraparound" fashion
    
    Extracts a certain number of elements from 
    a numpy.array starting at a certain position.
    If the chosen position and length go
    past the end of the array, the extraction
    "wraps around" to the beginning of the numpy.array
    as many times as necessary. For instance:
    
    wraparound_extract(
        original = [0, 5, 10],
        begin = 1, 
        length = 7) -> [5, 10, 0, 5, 10, 0, 5]
    
    Args:
        original (np.array): the original array 
        begin (int): beginning position to extract
        length (int): number of elements to extract
    '''

    # Get `head`: the array after the beginning position
    assert(type(original) == np.ndarray)
    len_original = original.shape[0]
    begin = begin % len_original
    head = original[begin:]
    len_head = head.shape[0]

    # Number of elements we require for full wrap-around
    wrap_needed = length - len_head

    # Generate the desired list, wrapped if necessary
    if wrap_needed > 0:
        repeats = np.tile(original, int(wrap_needed/len_original))
        tail = np.array(original[ : (wrap_needed % len_original)])
        desired_list = np.concatenate((head, repeats, tail))
    else:
        desired_list = original[begin:begin+length]
    
    #print(desired_list)
    return desired_list

### Tests of `wraparound_extract()`

In [5]:
import numpy.testing as npt

# test zero beginning, not getting to end of original array
npt.assert_array_equal(wraparound_extract(original = np.array([0, 1]), begin = 0, length = 1), np.array([0]))

# test zero beginning, not getting to end of original array
npt.assert_array_equal(wraparound_extract(original = np.array([0, 1]), begin = 0, length = 2), np.array([0, 1]))

# test zero beginning, not wrapping
npt.assert_array_equal(wraparound_extract(original = np.array([0, 1]), begin = 0, length = 2), np.array([0, 1]))

# test zero beginning, wrapping around
npt.assert_array_equal(wraparound_extract(original = np.array([0, 1]), begin = 0, length = 3), np.array([0, 1, 0]))

# test nonzero beginning, not wrapping
npt.assert_array_equal(wraparound_extract(original = np.array([0, 1]), begin = 1, length = 1), np.array([1]))

# test nonzero beginning, wrapping around
npt.assert_array_equal(wraparound_extract(original = np.array([0, 1]), begin = 1, length = 3), np.array([1, 0, 1]))

# test multiwrap
npt.assert_array_equal(wraparound_extract(original = np.array([0, 1]), begin = 1, length = 10), np.array([1, 0, 1, 0, 1, 0, 1, 0, 1, 0]))

# test wrapping around beginning
npt.assert_array_equal(wraparound_extract(original = np.array([0, 1]), begin = 5, length = 3), np.array([1, 0, 1]))



### Main function: `get_chunk()`

In [6]:
def get_chunk(
    samples, 
    sample_rate,
    start_position = None, # randomize start position
    duration = 5, # 5 seconds
    duration_jitter = 0.5, #jitter duration +- 0.5s
    chance_random_skip = 0.3 #randomly skip 30% of the time
):
    '''
    Extracts chunk of audio with some augmentation
    
    Extracts samples of audio from a master list
    of samples. 
    
    Available data augmentation options include:
        - selecting a position to start extracting from
          or allowing function to randomly choose start
        - selecting duration of chunk and allowing
          for random jitter of duration
        - randomly skipping some number of samples from
          0 to the length of the chunk
    
    If the chunk to be extracted reaches the end of the
    samples, the chunk will "wrap around" and start
    reading from the beginning of the samples.
    
    Args:
        samples (numpy.array): audio samples loaded
            by librosa.load or audio.load
        sample_rate (int or float): sample rate of `samples`
        start_position (int): position in the file to start
            extracting samples from. If None, the start position 
            is chosen randomly
        duration (float): desired duration, in seconds, 
            of chunk to extract
        duration_jitter (float): if this value is not 0,
            the duration of the chunk extracted will 
            be randomly selected from the range 
            (duration - duration_jitter, duration + duration_jitter)
        chance_random_skip (float between 0 and 1):
            percent chance of random skipping. In a random skip,
            a position within the chunk will be randomly
            selected, and from that position in the 
            audio file, a random number of samples will 
            be skipped. The number of samples skipped is between
            0 and the number of samples in the entire chunk
    
    Returns:
        samples
    '''
    
    # Get a random start position
    num_samples = len(samples)
    if not start_position:
        start_position = random.randint(0, num_samples)

    # Convert seconds to samples
    seconds_to_extract = duration + random.uniform(-duration_jitter, duration_jitter)
    samples_to_extract = int(seconds_to_extract * sample_rate)
    
    # Get chunks with skip in the middle with probability = chance_random_skip
    if random.random() < chance_random_skip:
        position_to_skip = random.randint(0, samples_to_extract)
        amount_to_skip = random.randint(0, samples_to_extract)

        chunk_1_start = start_position
        chunk_1_end = chunk_1_start + position_to_skip
        chunk_2_start = chunk_1_end + amount_to_skip
        chunk_2_end = chunk_1_start + (samples_to_extract - position_to_skip)
        
        chunk_1 = wraparound_extract(samples, chunk_1_start, chunk_1_end)
        chunk_2 = wraparound_extract(samples, chunk_2_start, chunk_2_end)
        chunk = np.concatenate((chunk_1, chunk_2))
    
    # Otherwise get contiguous chunk
    else:
        chunk = wraparound_extract(samples, start_position, samples_to_extract) 
        
    
    return chunk
    

get_chunk(samples = samples, sample_rate = sr)

array([-6.1611704e-06,  1.2375080e-05, -1.8404073e-05, ...,
        1.4923733e-05, -2.8538039e-05,  6.9725588e-06], dtype=float32)

### TODO: tests for `get_chunk()`

# Cyclic shift

In [7]:
def cyclic_shift(array, split_point = None):
    '''
    Shift array cyclicly by a random amount
    
    Shift array cyclicly by a random amount. Equivalent to
    splitting array into two parts at a random element, then
    switching the order of the parts.
    
    Args: 
        array (np.array): 1D-array to be split
        split_point (float): float in (0, 1) describing
            where in array to split -- for testing purposes.
            For stochastic splitting, leave as None.
    
    Returns:
        shifted_array: shifted array
    '''
    
    assert(type(array) == np.ndarray)
    length = array.shape[0]
    
    # Stochastic split point, or split point by floor of split_point * length of array
    if not split_point: split_point = random.randint(0, length)
    else: split_point = int(split_point * length)
    
    return np.concatenate((array[split_point:], array[:split_point]))

In [8]:
# Test random splitting
random.seed(100)
npt.assert_array_equal(cyclic_shift(np.array((0, 1, 2, 3, 4, 5, 6, 7))), np.array([2, 3, 4, 5, 6, 7, 0, 1]))

# Test deterministic splitting
npt.assert_array_equal(cyclic_shift(np.array([0, 1, 2]), split_point=0.5), np.array([1, 2, 0]))

# Test deterministic splitting
npt.assert_array_equal(cyclic_shift(np.array([0, 1, 2, 3]), split_point=0.5), np.array([2, 3, 0, 1]))

# Divided-samples augmentations: time & freq

### Helper function to divide samples randomly `divide_samples()`

In [9]:
def divide_samples(
    samples,
    sample_rate,
    low_duration = 0.5,
    high_duration = 5
):
    '''
    Divide audio samples into random-sized segments
    
    Divide audio samples into random-sized segments
    between the desired durations. The number
    of segments is not deterministic.
    
    Args:
        samples (np.ndarray): 1d array of samples
        sample_rate (int or float): sample rate of samples
        low_duration (float): minimum duration
            in seconds of any segment
        high_duration (float): maximum duration
            in seconds of any segment
    
    Returns:
        segments, list of sample lists
    '''
    
    min_chunk = int(low_duration * sample_rate)
    max_chunk = int(high_duration * sample_rate)
    
    samples_to_take = samples.copy()
    
    segments = []
    
    while samples_to_take.shape[0]:
        seg_size = random.randint(min_chunk, max_chunk)
        segment, samples_to_take = np.split(samples_to_take, [seg_size])
        segments.append(segment)
    
    return segments
    

In [10]:
# Test chunk division at set amount
array0 = np.array([0, 0, 0])
array1 = np.array([1, 1, 1])
array2 = np.array([2])
all_arrays = (array0, array1, array2)
cat_arrays = np.concatenate(all_arrays)
results = divide_samples(cat_arrays, sample_rate=1, low_duration=3, high_duration=3)
for idx, result in enumerate(results):
    npt.assert_array_equal(result, all_arrays[idx])
    
# Test random chunk division
random.seed(333)
# Predetermined results with random.seed(333)
predetermined = [np.array([0, 1, 2, 3, 4, 5, 6, 7]), np.array([8, 9])]
results = divide_samples(np.array(range(10)), sample_rate=1, low_duration=0, high_duration=10)
for idx, result in enumerate(results):
    npt.assert_array_equal(result, predetermined[idx])

### Helper function to concatenate divisions: `combine_samples()`

In [11]:
def combine_samples(divided):
    '''
    Recombine divided sample arrays
    
    Combine divided sample arrays back into a 
    single array, perhaps after each division
    has been modified by pitch shifting, time stretching, etc.
    
    Args:
        divided (list of np.ndarrays): list of sample arrays
            divided by divide_samples()
    
    Returns:
        sample arrays concatenated back into a single array
    '''
    
    return np.concatenate(divided)

In [12]:
# Test that divided samples can be recombined successfully
samples, sr = librosa.load('../tests/silence_10s.mp3')
divided = divide_samples(samples, sample_rate=sr, low_duration=0.5, high_duration=4)
npt.assert_array_equal(combine_samples(divided), samples)

### Time stretch the divisions `time_stretch_divisions()`

In [13]:
def time_stretch_divisions(
    divisions,
    chance_per_division = 0.50,
    mean_stretch = 1,
    sd_stretch = 0.05
):
    '''
    Time stretch divisions
    
    Given a list of np.ndarrays, each np.ndarray representing
    audio samples, time stretch each array with some probability. 
    
    Args"
        divisions (list of np.ndarrays): list of np.ndarrays
            where each element of the list is samples from
            an audio file. A list of divisions can be generated 
            with helper functions in this module
        chance_per_division (float between 0 and 1): for
            each division, the chance it will be time-stretched
        mean_stretch (float): the mean stretch multiplier.
            == 1 is no stretch; > 1 is sped up, < 1 is slowed down
        sd_stretch (float > 0): the sd of the stretch 
            distribution. 
    
    Returns:
        stretched_divisions, time-stretched divisions
    '''
    stretched_divisions = []
    
    for d in divisions:
        if random.random() < chance_per_division:
            stretch_factor = np.random.normal(
                loc = mean_stretch,
                scale = sd_stretch)
            stretched_d = librosa.effects.time_stretch(y = d, rate = stretch_factor)
            stretched_divisions.append(stretched_d)
        else:
            stretched_divisions.append(d)
    
    return stretched_divisions

In [14]:
divs = divide_samples(
    samples = samples,
    sample_rate = sr,
    low_duration = 0.5,
    high_duration = 4)
print(divs)
print()
print(time_stretch_divisions(divs))

[array([-3.1739323e-06, -1.4155360e-05, -8.0013604e-07, ...,
        5.1449551e-06, -4.9028572e-06,  2.2106809e-05], dtype=float32), array([-2.2841281e-05,  2.0544954e-05, -1.4502888e-05, ...,
       -1.6800034e-06, -1.1951680e-05,  5.1101538e-06], dtype=float32), array([ 3.8452645e-06, -1.3770463e-05,  2.2396178e-05, ...,
       -2.1374344e-06,  1.4918282e-05, -6.5673667e-06], dtype=float32), array([ 2.7424021e-05, -3.0760748e-05, -7.1096433e-06, ...,
        2.3299851e-06, -1.1209999e-06, -1.9890509e-05], dtype=float32)]

[array([-3.2555515e-06, -1.4118433e-05, -7.7664885e-07, ...,
        3.2824366e-06,  8.8522086e-07, -1.3140799e-07], dtype=float32), array([-2.2841281e-05,  2.0544954e-05, -1.4502888e-05, ...,
       -1.6800034e-06, -1.1951680e-05,  5.1101538e-06], dtype=float32), array([ 4.0471245e-06, -1.3970946e-05,  2.2610662e-05, ...,
        1.1275706e-06,  7.0119158e-06, -3.3273775e-06], dtype=float32), array([ 2.7424021e-05, -3.0760748e-05, -7.1096433e-06, ...,
        2.329

### Frequency shift the divisions `pitch_shift_divisions()`

In [15]:
def pitch_shift_divisions(
    divisions,
    sample_rate,
    chance_per_division = 0.40,
    mean_shift = 0,
    sd_shift = 0.25
):
    '''
    Time stretch divisions
    
    Given a list of np.ndarrays, each np.ndarray representing
    audio samples, pitch-shift each array with some probability. 
    The mean_shift and sd_shift should be given in "fractional
    half-steps," e.g. 0.25 = 1/4th of a half-step = 25 cents.
    
    Args:
        divisions (list of np.ndarrays): list of np.ndarrays
            where each element of the list is samples from
            an audio file. A list of divisions can be generated 
            with helper functions in this module
        sample_rate (int or float): sample rate of all divisions
        chance_per_division (float between 0 and 1): for
            each division, the chance it will be time-stretched
        mean_shift (float): the mean pitch shift in (fractional) half-steps
            == 0 is no shift; > 0 is shift up; < 1 is shift down
        sd_shift (float > 0): the sd of the shift 
            distribution in cents
    
    Returns:
        shifted_divisions, pitch-shifted divisions
    '''
    shifted_divisions = []
    
    for d in divisions:
        if random.random() < chance_per_division:
            shift_factor = np.random.normal(
                loc = mean_shift,
                scale = sd_shift)
            shifted_d = librosa.effects.pitch_shift(
                y = d,
                sr = sample_rate,
                n_steps = shift_factor)
            shifted_divisions.append(shifted_d)
        else:
            shifted_divisions.append(d)
    
    return shifted_divisions

In [16]:
divs = divide_samples(
    samples = samples,
    sample_rate = sr,
    low_duration = 0.5,
    high_duration = 4)
print(divs)
print()
print(pitch_shift_divisions(divs, sample_rate = sr))

[array([-3.1739323e-06, -1.4155360e-05, -8.0013604e-07, ...,
       -3.3414140e-06, -1.6852617e-05,  2.4009105e-05], dtype=float32), array([-1.4325174e-05,  5.6216527e-06, -1.4833859e-06, ...,
        8.9725813e-07,  1.0267116e-05,  1.3058184e-06], dtype=float32), array([-1.0680385e-06,  2.3093113e-05, -4.1025992e-06, ...,
       -4.4186345e-06,  7.7120267e-06, -7.1442760e-06], dtype=float32), array([ 1.2961112e-05, -1.9446245e-05,  1.8699564e-05, ...,
       -1.0537792e-05,  1.7019402e-05, -1.0489415e-05], dtype=float32), array([-1.1029069e-05,  2.3416687e-05, -1.8281529e-05, ...,
        2.0505417e-05, -3.3454414e-06, -9.0473446e-07], dtype=float32), array([ 3.24280541e-06,  6.52213475e-06,  1.37935285e-05, ...,
        2.32998514e-06, -1.12099985e-06, -1.98905091e-05], dtype=float32)]

[array([-3.1739323e-06, -1.4155360e-05, -8.0013604e-07, ...,
       -3.3414140e-06, -1.6852617e-05,  2.4009105e-05], dtype=float32), array([-1.4325174e-05,  5.6216527e-06, -1.4833859e-06, ...,
       