In [None]:
import os
import pickle
from collections import defaultdict
import json

from umap import UMAP
from scipy.ndimage import maximum_filter, gaussian_filter1d
import matplotlib.pyplot as plt
import matplotlib.cm as cm
import numpy as np
import cv2

from utils.list_files import list_files, get_stem
from utils.histogram_equalize import histogram_equalize
from utils.ffmpeg import auread
from utils.imutil import imshow, imresize

from siren import build_fingerprint, build_features, build_targets, to_indices

sr = 44100
hop_length = 512
feature_framerate = sr / hop_length

In [None]:
grouped = {}

for audio_fn in list_files('data', 'wav'):
    stem = get_stem(audio_fn)
    print('analyzing', stem)
    
    y,sr = auread(audio_fn, mono=True)
    amp = build_fingerprint(y, sr)
    features = build_features(y, sr)
    rms = features['rms'].flatten()
    centroid = features['centroid'].flatten()
    flatness = features['flatness'].flatten()
    
#     amp = grouped[stem]['amp']
#     rms = grouped[stem]['rms']
#     centroid = grouped[stem]['centroid']
#     flatness = grouped[stem]['flatness']
    
    n = len(amp)
    stems = [stem] * n
    
    grouped[stem] = {
        'stems': stems,
        'amp': amp,
        'rms': rms,
        'centroid': centroid,
        'flatness': flatness
    }
    
    annotation_fn = f'data/{stem}.Table.1.selections.txt'
    if os.path.exists(annotation_fn):
        print('loading annotation')
        units, singing, themes, unit_positions = build_targets(annotation_fn, y, sr, amp)
    else:
        print('empty annotation')
        units = np.asarray([''] * n, '<U8')
        singing = -np.ones(n, np.int8)
        themes = -np.ones(n, np.int8)
        unit_positions = np.zeros(n, np.float32)
        
    grouped[stem].update({
        'units': units,
        'singing': singing,
        'themes': themes,
        'unit_positions': unit_positions
    })

In [None]:
# create derived data
for stem, group in grouped.items():
    n = len(group['amp'])
    annotated = np.any(group['units'] != '')
    group['annotated'] = np.asarray([annotated] * n)
    
    # take 5 second max filter on rms for fast signal
    rms = group['rms']
    group['local_rms'] = rms / maximum_filter(rms, size=feature_framerate*5)
    
    # 2 second lowpass rms for slow signal
    lowpass_rms = gaussian_filter1d(rms, sigma=feature_framerate*0.5)
    lowpass_rms /= maximum_filter(lowpass_rms, size=feature_framerate*15)
    group['lowpass_rms'] = lowpass_rms

In [None]:
# these can be used to save and restore state between sessions, to save analysis time

def dump_cache(e, name):
    with open(f'cache/{name}.pkl', 'wb') as f:
        return pickle.dump(e, f)
    
def load_cache(name):
    with open(f'cache/{name}.pkl', 'rb') as f:
        return pickle.load(f)
    
dump_cache(grouped, 'grouped')
# grouped = load_cache('grouped')

In [None]:
# ungroup into list of lists
ungrouped = defaultdict(list)
for stem, group in grouped.items():
    for k,v in group.items():
        ungrouped[k].append(v)
        
def stack(arr):
    # hstack 1 dimensional
    if len(np.shape(arr[0])) == 1:
        return np.hstack(arr)
    # vstack two+ dimensional
    return np.vstack(arr)

# stack and convert to flat arrays
ungrouped = {k:stack(v) for k,v in ungrouped.items()}    

In [None]:
# preserves "unknown" categories as -1
def to_indices(arr, unknown=''):
    unique = np.unique(arr)
    if unknown in unique:
        unique = [e for e in unique if e != unknown]
    mapping = {e:i for i,e in enumerate(unique)}
    mapping[unknown] = -1
    mapped = [mapping[e] for e in arr]
    return np.asarray(mapped)

In [None]:
unit_indices = to_indices(ungrouped['units'])

In [None]:
%%time

unique_unit_indices = np.unique(unit_indices)
embed_amp_per_unit_2d = np.zeros((len(unit_indices), 2))
embed_amp_per_unit_1d = np.zeros((len(unit_indices), 1))

for unit_index in unique_unit_indices:
    if unit_index == -1:
        print('skipping unlabeled')
        continue
    print(unit_index)
    valid, = np.where(unit_indices == unit_index)
    amp_sub = ungrouped['amp'][valid]
    embed_amp_sub_per_unit_2d = UMAP(n_components=2).fit_transform(amp_sub)
    embed_amp_sub_per_unit_1d = UMAP(n_components=1).fit_transform(amp_sub)
    embed_amp_per_unit_2d[valid] = histogram_equalize(embed_amp_sub_per_unit_2d, max_val=1.0, endpoint=True)
    embed_amp_per_unit_1d[valid] = histogram_equalize(embed_amp_sub_per_unit_1d, max_val=1.0, endpoint=True)

In [None]:
%%time

# handle the unlabeled data using supervision
embed_amp_unit_2d = UMAP(n_components=2).fit_transform(ungrouped['amp'], unit_indices)
embed_amp_unit_1d = UMAP(n_components=1).fit_transform(ungrouped['amp'], unit_indices)

empty_indices = unit_indices == -1
embed_amp_per_unit_2d[empty_indices] = embed_amp_unit_2d[empty_indices]
embed_amp_per_unit_1d[empty_indices] = embed_amp_unit_1d[empty_indices]

In [None]:
def lab2rgb(brightness, ab):
    lab_colors = (ab * 200) - 100
    lab_colors = np.hstack((brightness * np.ones((len(lab_colors),1)), lab_colors))
    rgb_colors = cv2.cvtColor(lab_colors.reshape(-1,1,3).astype(np.float32), cv2.COLOR_Lab2RGB)
    return rgb_colors

In [None]:
# put back in groups using stem for lookup
for stem, group in grouped.items():    
    indices = ungrouped['stems'] == stem
#     group['embed_amp_theme'] = embed_amp_theme[indices]
#     group['embed_amp_unit'] = embed_amp_unit[indices]
#     group['unit_indices'] = unit_indices[indices]
    group['embed_amp_per_unit_1d'] = embed_amp_per_unit_1d[indices]
    group['embed_amp_per_unit_2d'] = embed_amp_per_unit_2d[indices]
        
dump_cache(grouped, 'grouped')

In [None]:
def blur_and_equalize(x, sigma):
    blurred = gaussian_filter1d(x, sigma, axis=0).T
    for i in range(len(blurred)):
        blurred[i] = histogram_equalize(blurred[i], max_val=1, endpoint=True)
    return blurred.T

for stem, group in grouped.items():
    sigma = feature_framerate * 0.1
    group['embed_amp_per_unit_1d_lowpass'] = blur_and_equalize(group['embed_amp_per_unit_1d'], sigma)
    group['embed_amp_per_unit_2d_lowpass'] = blur_and_equalize(group['embed_amp_per_unit_2d'], sigma)

In [None]:
def linmap(x, xmin, xmax, outmin, outmax):
    return outmin + (outmax - outmin) * (x - xmin) / (xmax - xmin)

def linear_window(x, edge):
    y = np.minimum(1, linmap(x, 0, edge, 0, 1))
    y = np.minimum(y, linmap(x, 1-edge, 1, 1, 0))
    return y

In [None]:
channels = 6
n_lights = 15
global_lights = 3
local_lights = n_lights - global_lights
positions = np.linspace(0, 1, local_lights)

def gaussian(x,x0,sigma):
    return np.exp(-np.power((x - x0)/sigma, 2.)/2.)

def get_lighting(e):
    ab = None
    center = None
    rms = None
    
    colors = np.zeros((n_lights, channels))
    
    if e['annotated']:
        if e['units'] != '':
            ab = e['embed_amp_per_unit_2d']
            center = e['embed_amp_per_unit_1d']
            rms = e['local_rms']
    else:
        ab = e['embed_amp_per_unit_2d_lowpass']
        center = e['embed_amp_per_unit_1d_lowpass']
        rms = e['local_rms']
        
    if ab is not None:
        red, green, blue = lab2rgb(70, ab[np.newaxis,:]).flatten()
        rgbawu = np.array([red, green, blue, red, blue, green])
        weights = gaussian(positions, center, 0.3)[:,np.newaxis]
        colors[-local_lights:] = weights * rgbawu
        colors *= rms
        
        unit_position = e['unit_positions']
        if unit_position > 0:
            colors *= linear_window(unit_position, 0.05)

    colors[:global_lights] = colors[-local_lights:].max(axis=0)

    return colors
    

for stem, group in grouped.items():
    print(stem)
    keys = group.keys()
    design = []
    for e in zip(*group.values()):
        elt = dict(zip(keys, e))
        design.append(get_lighting(elt))
    design = np.asarray(design)
    design = np.clip(design, 0, 1)

    np.save(f'design/{stem}.npy', design)
    
print('done')