In [1]:
# Imports
import numpy as np
from PIL import Image
import requests
from io import BytesIO
import cv2

from keras import backend
from keras.models import Model
from keras.applications.vgg16 import VGG16

from scipy.optimize import fmin_l_bfgs_b
import tensorflow as tf
tf.compat.v1.disable_eager_execution()

import librosa
from librosa import display
import matplotlib.pyplot as plt
%matplotlib inline

import math
import pywt
from scipy import signal

from audiotsm import phasevocoder
from audiotsm.io.wav import WavReader, WavWriter

In [2]:
def read_audio_spectum_extended(filename):
    x, fs = librosa.load(filename)
    S = librosa.stft(x, N_FFT)
    S = np.log1p(np.abs(S)) 
    return S, x, fs

In [3]:
# SOURCE: https://github.com/scaperot/the-BPM-detector-python/blob/master/bpm_detection/bpm_detection.py
def no_audio_data():
    print("No audio data for sample, skipping...")
    return None, None

def peak_detect(data):
    max_val = np.amax(abs(data))
    peak_ndx = np.where(data == max_val)
    if len(peak_ndx[0]) == 0:  # if nothing found then the max must be negative
        peak_ndx = np.where(data == -max_val)
    return peak_ndx

def bpm_detector(data, fs):
    cA = []
    cD = []
    correl = []
    cD_sum = []
    levels = 4
    max_decimation = 2 ** (levels - 1)
    min_ndx = math.floor(60.0 / 220 * (fs / max_decimation))
    max_ndx = math.floor(60.0 / 40 * (fs / max_decimation))

    for loop in range(0, levels):
        cD = []
        # 1) DWT
        if loop == 0:
            [cA, cD] = pywt.dwt(data, "db4")
            cD_minlen = len(cD) / max_decimation + 1
            cD_sum = np.zeros(math.floor(cD_minlen))
        else:
            [cA, cD] = pywt.dwt(cA, "db4")

        # 2) Filter
        cD = signal.lfilter([0.01], [1 - 0.99], cD)

        # 4) Subtract out the mean.

        # 5) Decimate for reconstruction later.
        cD = abs(cD[:: (2 ** (levels - loop - 1))])
        cD = cD - np.mean(cD)

        # 6) Recombine the signal before ACF
        #    Essentially, each level the detail coefs (i.e. the HPF values) are concatenated to the beginning of the array
        cD_sum = cD[0 : math.floor(cD_minlen)] + cD_sum

    if [b for b in cA if b != 0.0] == []:
        return no_audio_data()

    # Adding in the approximate data as well...
    cA = signal.lfilter([0.01], [1 - 0.99], cA)
    cA = abs(cA)
    cA = cA - np.mean(cA)
    cD_sum = cA[0 : math.floor(cD_minlen)] + cD_sum

    # ACF
    correl = np.correlate(cD_sum, cD_sum, "full")

    midpoint = math.floor(len(correl) / 2)
    correl_midpoint_tmp = correl[midpoint:]
    peak_ndx = peak_detect(correl_midpoint_tmp[min_ndx:max_ndx])
    if len(peak_ndx) > 1:
        return no_audio_data()

    peak_ndx_adjusted = peak_ndx[0] + min_ndx
    bpm = 60.0 / peak_ndx_adjusted * (fs / max_decimation)
    #print(bpm)
    return bpm, correl

def get_bpm(raw_samples, fs):
    data = []
    correl = []
    bpm = 0
    n = 0
    nsamps = len(raw_samples)
    window_samps = int(window * fs)
    samps_ndx = 0  # First sample in window_ndx
    max_window_ndx = math.floor(nsamps / window_samps)
    bpms = np.zeros(max_window_ndx)
    
    for window_ndx in range(0, max_window_ndx):

        # Get a new set of samples
        # print(n,":",len(bpms),":",max_window_ndx_int,":",fs,":",nsamps,":",samps_ndx)
        data = raw_samples[samps_ndx : samps_ndx + window_samps]
        if not ((len(data) % window_samps) == 0):
            raise AssertionError(str(len(data)))

        bpm, correl_temp = bpm_detector(data, fs)
        if bpm is None:
            continue
        bpms[window_ndx] = bpm
        correl = correl_temp

        # Iterate at the end of the loop
        samps_ndx = samps_ndx + window_samps

        # Counter for debug...
        n = n + 1

    bpm = np.median(bpms)
    
    return bpm

In [4]:
CONTENT_FILENAME = 'Maid with the Flaxen Hair.wav'
STYLE_FILENAME = 'Kalimba.wav'
STYLE_BPM_MOD_FILENAME = 'KalimbaBPM.wav'

N_FFT = 2048

window = 3

In [5]:
a_content, raw_content, fs_content = read_audio_spectum_extended(CONTENT_FILENAME)
a_style, raw_style, fs_style = read_audio_spectum_extended(STYLE_FILENAME)

In [6]:
bpm_content = get_bpm(raw_content, fs_content);
bpm_style = get_bpm(raw_style, fs_style);
print(bpm_content)
print(bpm_style)

bpm_ratio = bpm_content / bpm_style;
print(bpm_ratio)

No audio data for sample, skipping...
115.81160639192599
119.66353111432706
0.9678103705737973


In [7]:
with WavReader(STYLE_FILENAME) as reader:
    with WavWriter(STYLE_BPM_MOD_FILENAME, reader.channels, reader.samplerate) as writer:
        tsm = phasevocoder(reader.channels, speed=bpm_ratio)
        tsm.run(reader, writer)