<a href="https://colab.research.google.com/github/satvik-venkatesh/you-only-hear-once/blob/main/music-speech-detection-example.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [16]:
!git clone https://github.com/satvik-venkatesh/you-only-hear-once.git

Cloning into 'you-only-hear-once'...
remote: Enumerating objects: 69, done.[K
remote: Counting objects: 100% (69/69), done.[K
remote: Compressing objects: 100% (64/64), done.[K
remote: Total 69 (delta 31), reused 0 (delta 0), pack-reused 0[K
Unpacking objects: 100% (69/69), done.


In [3]:
import librosa
import numpy as np
import tensorflow as tf
import math
import soundfile as sf
import os

In [4]:
NUM_THREADS = os.cpu_count()
if NUM_THREADS > 1:
    tf.config.threading.set_intra_op_parallelism_threads(int(NUM_THREADS))
    tf.config.threading.set_inter_op_parallelism_threads(int(NUM_THREADS))

In [5]:
LAYER_DEFS = [
    # (layer_function, kernel, stride, num_filters)
    ([3, 3], 1,   64),
    ([3, 3], 2,  128),
    ([3, 3], 1,  128),
    ([3, 3], 2,  256),
    ([3, 3], 1,  256),
    ([3, 3], 2,  512),
    ([3, 3], 1,  512),
    ([3, 3], 1,  512),
    ([3, 3], 1,  512),
    ([3, 3], 1,  512),
    ([3, 3], 1,  512),
    ([3, 3], 2, 1024),
    ([3, 3], 1, 1024),
    ([3, 3], 1, 512),
    ([3, 3], 1, 256),
    ([3, 3], 1, 128),
]

In [6]:
"""
Manually define YOHO
"""

# params = yamnet_params.Params()
m_features = tf.keras.Input(shape=(801, 64, 1), name="mel_input")
X = m_features
# X = tf.keras.layers.Reshape((801, 64, 1))(X)
X = tf.keras.layers.Conv2D(filters = 32, kernel_size=[3, 3], strides=2, padding='same', use_bias=False, activation=None, name = "layer1/conv")(X)
X = tf.keras.layers.BatchNormalization(center=True, scale=False, epsilon=1e-4, name = "layer1/bn")(X)
X = tf.keras.layers.ReLU(name="layer1/relu")(X)

for i in range(len(LAYER_DEFS)):
  X = tf.keras.layers.DepthwiseConv2D(kernel_size=LAYER_DEFS[i][0], strides = LAYER_DEFS[i][1], depth_multiplier=1, padding='same', use_bias=False,
                                      activation=None, name="layer"+ str(i + 2)+"/depthwise_conv")(X)
  X = tf.keras.layers.BatchNormalization(center=True, scale=False, epsilon=1e-4, name = "layer"+ str(i + 2)+"/depthwise_conv/bn")(X)
  X = tf.keras.layers.ReLU(name="layer"+ str(i + 2)+"/depthwise_conv/relu")(X)
  X = tf.keras.layers.Conv2D(filters = LAYER_DEFS[i][2], kernel_size=[1, 1], strides=1, padding='same', use_bias=False, activation=None,
                             name = "layer"+ str(i + 2)+"/pointwise_conv")(X)
  X = tf.keras.layers.BatchNormalization(center=True, scale=False, epsilon=1e-4, name = "layer"+ str(i + 2)+"/pointwise_conv/bn")(X)
  X = tf.keras.layers.ReLU(name="layer"+ str(i + 2)+"/pointwise_conv/relu")(X)


_, _, sx, sy = X.shape
X = tf.keras.layers.Reshape((-1, int(sx * sy)))(X)

pred = tf.keras.layers.Conv1D(6,kernel_size=1, activation="sigmoid")(X)
model = tf.keras.Model(
      name='yamnet_frames', inputs=m_features,
      outputs=[pred])

In [7]:
model.summary()

Model: "yamnet_frames"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 mel_input (InputLayer)      [(None, 801, 64, 1)]      0         
                                                                 
 layer1/conv (Conv2D)        (None, 401, 32, 32)       288       
                                                                 
 layer1/bn (BatchNormalizati  (None, 401, 32, 32)      96        
 on)                                                             
                                                                 
 layer1/relu (ReLU)          (None, 401, 32, 32)       0         
                                                                 
 layer2/depthwise_conv (Dept  (None, 401, 32, 32)      288       
 hwiseConv2D)                                                    
                                                                 
 layer2/depthwise_conv/bn (B  (None, 401, 32, 32)    

In [8]:
model.load_weights("models/YOHO-music-speech.h5")

In [9]:
def smoothe_events(events):
  music_events = []
  speech_events = []
  for e in events:
    if e[2] == "speech":
      speech_events.append(e)
    elif e[2] == "music":
      music_events.append(e)

  speech_events.sort(key=lambda x: x[0])
  music_events.sort(key=lambda x: x[0])


  start_speech = -1000
  stop_speech = -1000

  speech_events_2 = []

  max_speech_silence = 0.8
  max_music_silence = 0.8
  min_dur_speech = 0.8
  min_dur_music = 3.4

  count = 0

  while count < len(speech_events) - 1:
    if (speech_events[count][1] >= speech_events[count + 1][0]) or (speech_events[count + 1][0] - speech_events[count][1] <= max_speech_silence):
      speech_events[count][1] = max(speech_events[count + 1][1], speech_events[count][1])
      del speech_events[count + 1]
    else:
      count += 1

  count = 0

  while count < len(music_events) - 1:
    if (music_events[count][1] >= music_events[count + 1][0]) or (music_events[count + 1][0] - music_events[count][1] <= max_music_silence):
      music_events[count][1] = max(music_events[count + 1][1], music_events[count][1])
      del music_events[count + 1]
    else:
      count += 1


  smooth_events = music_events + speech_events


  count = 0
  while count < len(smooth_events):
    if smooth_events[count][1] - smooth_events[count][0] < min_dur_speech and smooth_events[count][2] == "speech":
      del smooth_events[count]

    elif smooth_events[count][1] - smooth_events[count][0] < min_dur_music and smooth_events[count][2] == "music":
      del smooth_events[count]

    else:
      count += 1

  for i in range(len(smooth_events)):
    smooth_events[i][0] = round(smooth_events[i][0], 3)
    smooth_events[i][1] = round(smooth_events[i][1], 3)

  smooth_events.sort(key=lambda x: x[0])

  return smooth_events

In [10]:
def get_log_melspectrogram(audio, sr = 16000, hop_length = 160, win_length = 400, n_fft = 512, n_mels = 64, fmin = 125, fmax = 7500):
    """Return the log-scaled Mel bands of an audio signal."""
    bands = librosa.feature.melspectrogram(
        y=audio, sr=sr, hop_length=hop_length, win_length = win_length, n_fft=n_fft, n_mels=n_mels, fmin=fmin, fmax=fmax, dtype=np.float32)
    return librosa.core.power_to_db(bands, amin=1e-7)

In [11]:
def normalize_audio(data):
    md = np.mean(data)
    sd = np.std(data)
    data = (data - md) / sd
    return data

In [12]:
"""
Make predictions for full audio --- vectorised implementation.
"""
def mk_preds_vector(audio_path, hop_size = 6.0, discard = 1.0, win_length = 8.0, sampling_rate = 22050):
  #in_signal, in_sr = sf.read(audio_path)
  in_signal, in_sr = librosa.load(audio_path, mono=True, sr=None)  
  in_signal = normalize_audio(in_signal)      

  # Resample the audio file.
  in_signal_22k = librosa.resample(in_signal, orig_sr=in_sr, target_sr=sampling_rate)
  in_signal = np.copy(in_signal_22k)

  audio_clip_length_samples = in_signal.shape[0]

  hop_size_samples = int(hop_size * sampling_rate)
  # hop_size_samples = 220 * 602 - 1

  win_length_samples = int(win_length * sampling_rate)
  # win_length_samples = 220 * 802 - 1

  n_preds = int(math.ceil((audio_clip_length_samples - win_length_samples) / hop_size_samples)) + 1

  # n_preds = int()

  #print('n_preds is {}'.format(n_preds))

  in_signal_pad = np.zeros(((n_preds - 1) * hop_size_samples) + win_length_samples)
  # in_signal_pad = np.zeros((n_preds * hop_size_samples + 200 * 220))

  #print('in_signal_pad.shape is {}'.format(in_signal_pad.shape))

  in_signal_pad[0:audio_clip_length_samples] = in_signal

  preds = np.zeros((n_preds, 26, 2))
  mss_in = np.zeros((n_preds, 801, 64))
  events = []

  for i in range(n_preds):
    seg = in_signal_pad[i * hop_size_samples:(i * hop_size_samples) + win_length_samples]
    #print('seg.shape is {}'.format(seg.shape))
    seg = librosa.util.normalize(seg)
    seg_t = librosa.resample(seg, orig_sr=22050, target_sr=16000)
    seg = seg_t

    mss = get_log_melspectrogram(seg)
    M = mss.T
    mss_in[i, :, :] = M

  preds = model.predict(mss_in)
  # preds[:, 0] = (p[:, 0] >= 0.5).astype(np.float)
  # preds[:, 2] = (p[:, 2] >= 0.5).astype(np.float)

  events = []

  for j in range(n_preds):
    p = preds[j, :, :]
    events_curr = []
    win_width = win_length / 26
    for i in range(len(p)):
      if p[i][0] >= 0.5:
        start = win_width * i + win_width * p[i][1]
        end = p[i][2] * win_width + start
        events_curr.append([start, end, "speech"])

      if p[i][3] >= 0.5:
        start = win_width * i + win_width * p[i][4]
        end = p[i][5] * win_width + start
        events_curr.append([start, end, "music"])

    se = events_curr
    if j == 0:
      start = 0.0
      end = start + win_length
      if preds.shape[0] > 1:
        end -= discard

      # print("start: {}   end: {}".format(start, end))
    elif j == n_preds - 1:
      start = j * hop_size + discard
      end = start - discard + win_length
      # print("start: {}   end: {}".format(start, end))

    else:
      start = j * hop_size + discard
      end = start + win_length - discard
      # print("start: {}   end: {}".format(start, end))
    
    for k in range(len(se)):
      se[k][0] = max(start, se[k][0] + j * hop_size)
      se[k][1] = min(end, se[k][1] + j * hop_size)

    # print(se)


    for see in se:
     events.append(see) 
    
  # print(events)
  smooth_events = smoothe_events(events)

  return smooth_events

In [13]:
import IPython
IPython.display.Audio("models/test-music-speech.wav")

In [14]:
#see = mk_preds_vector("models/test-music-speech.wav")
see = mk_preds_vector('models/tarefa_145687_trecho.wav')
print(see)

[[27.082, 30.765, 'music'], [57.39, 61.998, 'music'], [68.175, 84.765, 'music'], [89.232, 165.991, 'music'], [167.867, 218.152, 'music'], [223.892, 294.149, 'music'], [295.0, 450.152, 'music'], [453.453, 478.568, 'music'], [507.426, 511.844, 'music'], [543.154, 550.606, 'music'], [553.881, 560.769, 'music'], [565.0, 599.531, 'music'], [601.0, 663.673, 'music'], [667.0, 687.376, 'music'], [692.186, 755.23, 'music'], [756.771, 771.988, 'music'], [774.776, 783.07, 'music'], [784.644, 799.379, 'music'], [841.0, 846.447, 'music'], [847.304, 1035.07, 'music'], [1037.851, 1061.838, 'music'], [1066.022, 1070.462, 'music'], [1078.04, 1083.69, 'music'], [1088.896, 1102.302, 'music'], [1103.235, 1111.993, 'music'], [1113.386, 1144.922, 'music'], [1146.155, 1153.998, 'music'], [1189.0, 1198.92, 'music'], [1200.155, 1221.381, 'music'], [1224.223, 1243.996, 'music'], [1248.463, 1257.078, 'music'], [1258.017, 1265.841, 'music'], [1276.001, 1279.688, 'music'], [1282.098, 1314.154, 'music'], [1315.0, 1

In [15]:
from pyannote.core import Segment, Timeline, Annotation

In [16]:
reference = Annotation()
for s in see:
    reference[Segment(s[0],s[1])] = s[2]

In [17]:
reference2 = reference.support(collar=15.0)

In [18]:
with open('audacity.txt','w') as f:
    for seg, t, label in reference2.itertracks(yield_label=True):
        f.write('{:.5f}\t{:.5f}\t{}\n'.format(seg.start,seg.end,label))