In [None]:
import scipy
import matplotlib.pyplot as plt
import os
import ffmpeg
import moviepy.editor as mp
from scipy.io.wavfile import read
import numpy as np
from scipy.fft import fft, ifft
import math
import numpy as np

In [None]:
ROOT_DIR = os.path.dirname(os.getcwd())
DATA_FOLDER = os.path.join(ROOT_DIR, "data")

In [None]:
video_waltter_path = os.path.join(DATA_FOLDER, "example_waltter.MOV")
video_vikture_path = os.path.join(DATA_FOLDER, "example_vikture.MOV")
audio_waltter_path = os.path.join(DATA_FOLDER, "example_waltter.wav")
audio_vikture_path = os.path.join(DATA_FOLDER, "example_vikture.wav")

video_waltter_e15_path = os.path.join(DATA_FOLDER, "example_waltter_early_15s.MOV")
video_vikture_e15_path = os.path.join(DATA_FOLDER, "example_vikture_early_15s.MOV")
audio_waltter_e15_path = os.path.join(DATA_FOLDER, "example_waltter_early_15s.wav")
audio_vikture_e15_path = os.path.join(DATA_FOLDER, "example_vikture_early_15s.wav")

video_waltter_l15_path = os.path.join(DATA_FOLDER, "example_waltter_late_15s.MOV")
video_vikture_l15_path = os.path.join(DATA_FOLDER, "example_vikture_late_15s.MOV")
audio_waltter_l15_path = os.path.join(DATA_FOLDER, "example_waltter_late_15s.wav")
audio_vikture_l15_path = os.path.join(DATA_FOLDER, "example_vikture_late_15s.wav")

video_waltter_e30_path = os.path.join(DATA_FOLDER, "example_waltter_early_30s.MOV")
video_vikture_e30_path = os.path.join(DATA_FOLDER, "example_vikture_early_30s.MOV")
audio_waltter_e30_path = os.path.join(DATA_FOLDER, "example_waltter_early_30s.wav")
audio_vikture_e30_path = os.path.join(DATA_FOLDER, "example_vikture_early_30s.wav")

In [None]:
def extract_audio(video_path):
    video = mp.VideoFileClip(video_path)
    audio = video.audio
    return audio

In [None]:
def plot_audio(data):
    plt.title("Audio")
    plt.plot(data)
    plt.show()

In [None]:
def stereo_to_mono(wav_array: np.ndarray):
    mono_wav = wav_array.mean(axis=1)
    return mono_wav

In [None]:
def normalize_wav(wav_array: np.ndarray):
    normalized_wav_array = 2.*(wav_array - np.min(wav_array))/np.ptp(wav_array)-1
    return normalized_wav_array

In [None]:
def contrast_audio(audio_array: np.ndarray):
    return audio_array**2

In [None]:
def preprocess_audio(audio_file_path: str, to_mono=True, normalize=True, contrast=True):
    sample_rate, audio = read(audio_file_path)
    audio_data = np.array(audio, dtype=float)
    if to_mono:
        audio_data = stereo_to_mono(audio_data)
    if normalize:
        audio_data = normalize_wav(audio_data)
    if contrast:
        audio_data = contrast_audio(audio_data)
    return audio_data

In [None]:
plot_audio(preprocess_audio(audio_waltter_path, normalize=False, contrast=False))
plot_audio(preprocess_audio(audio_waltter_path, normalize=True, contrast=False))
plot_audio(preprocess_audio(audio_waltter_path, normalize=True, contrast=True))

In [None]:
audio_waltter = preprocess_audio(audio_waltter_path)
audio_vikture = preprocess_audio(audio_vikture_path)

plot_audio(audio_waltter)
plot_audio(audio_vikture)

audio_waltter_e15 = preprocess_audio(audio_waltter_e15_path)
audio_vikture_e15 = preprocess_audio(audio_vikture_e15_path)

plot_audio(audio_waltter_e15)
plot_audio(audio_vikture_e15)

audio_waltter_l15 = preprocess_audio(audio_waltter_l15_path)
audio_vikture_l15 = preprocess_audio(audio_vikture_l15_path)

plot_audio(audio_waltter_l15)
plot_audio(audio_vikture_l15)

In [None]:
audio_samples = [audio_waltter, audio_vikture, audio_waltter_e15, audio_vikture_e15, audio_waltter_l15, audio_vikture_l15]

In [None]:
def wav_to_blocks(wav_array: np.ndarray, block_size=882, overlap=441, aggregation_func=np.mean):
    blocks = np.array([])
    array_length = len(wav_array)
    block_amount = math.ceil(array_length / (block_size - overlap))
    for i in range(block_amount):
        block_start = (block_size - overlap) * i
        block_end = min(block_start + block_size, array_length)
        current_block = wav_array[block_start : block_end]
        block_aggregate = aggregation_func(current_block)
        blocks = np.append(blocks, block_aggregate)
        
    return blocks

In [None]:
for sample in audio_samples:
    sample_blocks = wav_to_blocks(sample)
    plot_audio(sample_blocks)

In [None]:
plot_audio(waltter_blocks)

In [None]:
print(sorted(np.argpartition(waltter_blocks, -100)[-100:]))
print(sorted(np.argpartition(vikture_blocks, -100)[-100:]))

In [None]:
print(sorted(np.argpartition(normalized_waltter, -1000)[-1000:]))
print(sorted(np.argpartition(normalized_vikture, -1000)[-1000:]))

In [None]:
waltter_extreme_blocks = wav_to_blocks(extrame_normalized_waltter, block_size=2205, overlap=0) 
print(len(waltter_extreme_blocks))

vikture_extreme_blocks = wav_to_blocks(extreme_normalized_vikture, block_size=2205, overlap=0) 
print(len(vikture_extreme_blocks))

In [None]:
print(sorted(np.argpartition(waltter_extreme_blocks, -10)[-10:]))
print(sorted(np.argpartition(vikture_extreme_blocks, -10)[-10:]))

In [None]:
audio_waltter = preprocess_audio(audio_vikture_path, normalize=True, contrast=True)
audio_vikture = preprocess_audio(audio_vikture_l15_path, normalize=True, contrast=True)

In [None]:
# Calculate absolute distance between audio samples
def audio_mse_score(audio1, audio2, blocks=False):
    assert len(audio1) == len(audio2)
    
    if blocks:
        audio1_blocks = np.array(wav_to_blocks(audio1, block_size=441, overlap=0))
        audio2_blocks = np.array(wav_to_blocks(audio2, block_size=441, overlap=0))
        mse_score = (np.square(audio1_blocks - audio2_blocks)).mean()
    else:
        mse_score = (np.square(audio1 - audio2)).mean()
        
    return mse_score

In [None]:
# Calculate absolute distance between audio samples
def audio_absolute_distance_score(audio1, audio2, blocks=False):
    assert len(audio1) == len(audio2)
    
    if blocks:
        audio1_blocks = np.array(wav_to_blocks(audio1, block_size=441, overlap=0))
        audio2_blocks = np.array(wav_to_blocks(audio2, block_size=441, overlap=0))
        distance_score = (np.abs(audio1_blocks - audio2_blocks)).sum()
    else:
        distance_score = (np.abs(audio1 - audio2)).sum()
    
    return distance_score

In [None]:
def safe_division(first, second):
    if first == 0 or second == 0:
        return 0
    else:
        return first / second

# Compare windows from audio1 to audio2 and find windows that minimizes the distance between audio files
def calculate_distance_plot(audio1, audio2, verbose=False):
    a1a2_distance_scores = []
    a2a1_distance_scores = [] 

    # Accuracy of 1000th of a second
    window_increment = math.floor(44100 / 100)
    window_size_factor = 2
    
    audio1_lenght = len(audio1)
    window_size_1 = math.floor(audio1_lenght / window_size_factor)
    print(f"Windows size 1: {window_size_1}")
    windows_amount_1 = math.ceil((audio1_lenght - window_size_1) / window_increment)
    
    # We don't know which audio files is delayed compared to the other, so we need to run the analysis both ways, 
    # first comparing audio1 window to audio2 start and then comparing audio2 windows to audio1 start 
    
    print("Comparing audio1 to audio2")
    for i in range(windows_amount_1):
        window_start = window_increment * i
        window_end = min(window_start + window_size_1, audio1_lenght)
        audio1_window = audio1[window_start : window_end]
        audio2_window = audio2[0 : window_size_1]
        mse_score = audio_absolute_distance_score(audio1_window, audio2_window)
        a1a2_distance_scores.append(mse_score)
        
        if verbose and  i % math.floor(windows_amount_1 / 100) == 0:
            print(f"{round(safe_division(i, windows_amount_1) * 100, 2)}%")
     
    audio2_lenght = len(audio2)
    window_size_2 = math.floor(audio2_lenght / window_size_factor)
    print(f"Windows size 2: {window_size_2}")
    windows_amount_2 = math.ceil((audio2_lenght - window_size_2) / window_increment)

    print("Comparing audio2 to audio1")
    for i in range(windows_amount_2):
        window_start = window_increment * i
        window_end = min(window_start + window_size_2, audio2_lenght)
        audio1_window = audio1[0 : window_size_2]
        audio2_window = audio2[window_start : window_end]
        mse_score = audio_absolute_distance_score(audio1_window, audio2_window)
        a2a1_distance_scores.append(mse_score)
        
        if verbose and i % math.floor(windows_amount_2 / 100) == 0:
            print(f"{round(safe_division(i, windows_amount_2) * 100, 2)}%")
            
    plt.plot(a1a2_distance_scores)
    plt.title("Audio 1 sliding window MSE")
    plt.show()
    
    plt.plot(a2a1_distance_scores)
    plt.title("Audio 2 sliding window MSE")
    plt.show()

    a1a2_distance_scores_np = np.array(a1a2_distance_scores)
    a2a1_distance_scores_np = np.array(a2a1_distance_scores)

    a1a2_lowest_score_arg = np.argmin(a1a2_distance_scores_np)
    a2a1_lowest_score_arg = np.argmin(a2a1_distance_scores_np)
    
    print(a1a2_lowest_score_arg)
    print(a2a1_lowest_score_arg)
    
    a1a2_lowest_score = np.min(a1a2_distance_scores)
    a2a1_lowest_score = np.min(a2a1_distance_scores)
    
    lowest_score = min(a1a2_lowest_score, a2a1_lowest_score)
    lowest_score_arg = min(a1a2_lowest_score, a2a1_lowest_score)

    if a1a2_lowest_score < a2a1_lowest_score:
        delay = (a1a2_lowest_score_arg * window_increment) / 44100
        print(f"Audio 1 needs {delay} second delay")
    else:
        delay = (a2a1_lowest_score_arg * window_increment) / 44100
        print(f"Audio 2 needs {delay} second delay")

In [None]:
calculate_distance_plot(audio_waltter, audio_vikture, verbose=True)