In [2]:
%load_ext autoreload
%autoreload 2

import tensorflow as tf
from tensorflow  import keras
import tensorflow_hub as hub
import numpy as np
from util import open_slideshow


import librosa 
import matplotlib.pyplot as plt
from tqdm import tqdm
from config import *
import h5py
import soundfile as sf
from scipy.signal import resample_poly
from util import DEFAULT_TOKENS
import librosa

2024-09-25 14:02:59.349702: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-09-25 14:02:59.349740: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-09-25 14:02:59.350813: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-09-25 14:02:59.357687: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX512F FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [11]:
SR = 16_000
EMBEDDS = INTERMEDIATE / 'embeddings_20p.hdf5'
SAMPLES = INTERMEDIATE / '22sr_samples.hdf5'
OVERLAP_THRESH =  0.20 

samples_f = h5py.File(SAMPLES, 'r')
embedds_f = h5py.File(EMBEDDS, 'a')

all_recs = np.load(ANNOTATIONS / 'manual_annotations' / 'all_annotated_recordings_filtered.npy', allow_pickle=True)
annotations = pd.read_csv(ANNOTATIONS / 'manual_annotations' / 'initial_manual_annotations.csv')
annotated_recordings = annotations.recording.unique()

In [4]:
def get_label_bounds(some_labelled_sample, label_idx, Y_samples):
    def find_surrounding_values(arr, x):
        if len(arr) < 2:
            return None

        for i in range(len(arr) - 1):
            if arr[i] <= x < arr[i + 1]:
                return (arr[i], arr[i + 1])
        
        return None 
    
    diff = np.diff(Y_samples[label_idx, :])
    change_samples = np.where(diff)[0] + 1 # change is before
    print(change_samples)
    return find_surrounding_values(change_samples, some_labelled_sample)
    
    
# test
rec = annotated_recordings[0]
s_22, Y_samples = np.array(samples_f[rec]['X']), np.array(samples_f[rec]['Y'])
rows, cols = np.where(Y_samples.T)
print('starts at ', rows[25246+1])
get_label_bounds(rows[25246+1], cols[25246+1], Y_samples)


starts at  9661834
[9625586 9635706 9642160 9654186 9658733 9670173 9676773 9686893 9694226
 9706253]


(9658733, 9670173)

In [5]:
PERCENTAGES = {
    'fast_trill_6khz': None,
    'nr_syllable_3khz': 0.90, 
    'triangle_3khz': 0.80,
    'upsweep_500hz': 0.80,
}

def fill_Y_with_label(n_frames, label_type, labels):
    frame_length = 0.96
    step_size = 0.48
    Y_row = np.zeros(n_frames)
    label_idx = DEFAULT_TOKENS[label_type]
    
    # frames across the recording 
    for i in range(n_frames):
        frame_start = i * step_size
        frame_end = frame_start + frame_length
        
        # how much of the label is required to be within the frame
        percent_required = PERCENTAGES[label_type]
        
        # all labels contained in this frame or contain the frame itself
        contained_labels = [
            (s, e) for s, e in labels 
            if (frame_start < s < frame_end) and (frame_start < e < frame_end)
        ]
        encapsulating_labels = [
            (s, e) for s, e in labels 
            if (s < frame_start) and (frame_end < e)
        ]
        if len(contained_labels) > 0 or len(encapsulating_labels) > 0:
            Y_row[i] = 1 
        
        # labels that start in this frame only
        starting_labels = [
            (s, e) for s, e in labels
            if frame_start < s < frame_end
        ]
        for (s, e) in starting_labels:
            if label_type == 'fast_trill_6khz':
                if (frame_end - s) > 0.6: # atleast 0.6 seconds required
                    Y_row[i] = 1  
            else:
                if (frame_end - s) / (e - s) > percent_required:
                    Y_row[i] = 1  
            
        # labels that end in this frame only
        ending_labels = [
            (s, e) for s, e in labels 
            if frame_start < e < frame_end
        ]
        for (s, e) in ending_labels:
            if label_type == 'fast_trill_6khz':
                if (e - frame_start) > 0.6:
                    Y_row[i] = 1
            else:
                if (e - frame_start) / (e - s) > percent_required:
                    Y_row[i] = 1  
               
                 
    return Y_row


def get_label_pairs(label_df):
    label_start_times, label_end_times = (
        np.array(label_df["min_t"].astype(float)), 
        np.array(label_df["max_t"].astype(float))
    )
    return list(zip(label_start_times, label_end_times))


def compute_frame_labels(rec, annotations_df):
    n_samples = len(samples_f[rec]['X'])
    n_seconds = n_samples / 22_000
    n_frames = int(n_seconds / 0.48)
    annotated_recordings = annotations_df.recording.unique()
    Y_frames = np.zeros(shape=(4, n_frames), dtype=bool) 
    
    # all 0s if no annotations present
    if rec not in annotated_recordings:
        return Y_frames
    
    rec_df = annotations_df[annotations_df.recording == rec]
    for label, label_index in DEFAULT_TOKENS.items():
        
        # start and end times of annotations for this label
        labels = get_label_pairs(rec_df[rec_df.label == label])
        Y_frames[label_index] = fill_Y_with_label(n_frames, label, labels)
             
    return Y_frames


# test this ^^^
rec = annotated_recordings[1]
Y_frames = compute_frame_labels(rec, annotations)
Y_frames.shape, Y_frames.sum(), Y_frames[:, :15]

((4, 1250),
 20,
 array([[False, False, False, False, False, False, False, False, False,
         False, False, False, False, False, False],
        [ True,  True, False,  True,  True, False,  True, False, False,
          True,  True, False,  True,  True, False],
        [False, False, False, False, False, False, False, False, False,
         False, False, False, False, False, False],
        [False, False, False, False, False, False, False, False, False,
         False, False, False, False, False, False]]))

In [6]:
y = np.array(embedds_f[rec]['Y'])
y.shape, y.sum(), y[:, :15]

((4, 1250),
 33,
 array([[False, False, False, False, False, False, False, False, False,
         False, False, False, False, False, False],
        [ True,  True,  True,  True,  True, False,  True,  True,  True,
          True,  True,  True,  True,  True,  True],
        [False, False, False, False, False, False, False, False, False,
         False, False, False, False, False, False],
        [False, False, False, False, False, False, False, False, False,
         False, False, False, False, False, False]]))

In [None]:
print(f'n# annotated recs: ', len(annotated_recordings))
print(f'n# total \'annotated\' recs: ', len(all_recs))
print(f'n# of annotations: ', len(annotations))

from util import DEFAULT_TOKENS
order = list(DEFAULT_TOKENS.keys())
IMAGE_FOLDER = Path('../untracked/labeltest')
IMAGE_FOLDER.mkdir(exist_ok=True)

for rec in tqdm(annotated_recordings[:30]):
    
    samples = samples_f[rec]['X']
    embedds = embedds_f[rec]['X']

    # get labelled frames from above
    label_frames = compute_frame_labels(rec, annotations)
    cols, rows = np.where(label_frames)
    
    for i, (r, c) in enumerate(zip(rows, cols)):
        start_time = r * 0.48
        beginning_sample = librosa.time_to_samples(start_time, sr=22_000)
        end_sample = beginning_sample + librosa.time_to_samples(1, sr=22_000)
        widen = 1 * 22_000
        segment = samples[beginning_sample - widen:end_sample + widen]
        
        # plot spec
        plt.figure(figsize=(2, 3))
        S = librosa.stft(segment)
        S = librosa.power_to_db(S)
        librosa.display.specshow(S)

        # add lines around frame
        widen_frame = librosa.samples_to_frames(widen)
        plt.vlines([widen_frame, widen_frame*2], ymin=0, ymax=500)
        
        # save fig
        plt.title(order[c])
        path = IMAGE_FOLDER / order[c]
        path.mkdir(exist_ok=True)
        plt.savefig(path / f'{rec}_r={r}.png')
        ax = plt.clf()

    
    # embedds_f[rec].create_dataset("X_strict", data=label_frames, dtype=bool)
    

In [12]:
print(f'n# annotated recs: ', len(annotated_recordings))
print(f'n# total \'annotated\' recs: ', len(all_recs))
print(f'n# of annotations: ', len(annotations))

from util import DEFAULT_TOKENS
order = list(DEFAULT_TOKENS.keys())

for rec in tqdm(list(embedds_f)):
    label_frames = compute_frame_labels(rec, annotations)
    embedds_f[rec].create_dataset("Y_strict", data=label_frames, dtype=bool)


n# annotated recs:  101
n# total 'annotated' recs:  354
n# of annotations:  1981


  4%|▎         | 13/354 [00:00<00:02, 118.77it/s]

100%|██████████| 354/354 [00:01<00:00, 204.81it/s]


In [13]:
embedds_f.close()