In [77]:
import tensorflow
from tensorflow.keras.applications.vgg16 import VGG16
from tensorflow.keras.models import Model
import cv2
import os
import numpy as np
from scipy.signal import spectrogram, resample
import matplotlib.pyplot as plt
import soundfile as sf
import numpy as np
import time
import os
from collections import defaultdict
import librosa.core as lr
import pandas as pd
from operator import itemgetter


# Video fingerprint


### Load pretrained model and cut the last layer
- For this case, i choose to use the values of the second fully connected layer as encoding for each frame, generating a vector of 4096 values and for the three frames a matrix of dimensions 3x4096

In [78]:
model = VGG16()
model = Model(inputs=model.inputs, outputs=model.layers[-2].output)

In [79]:
model.summary()

Model: "model"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_1 (InputLayer)         [(None, 224, 224, 3)]     0         
_________________________________________________________________
block1_conv1 (Conv2D)        (None, 224, 224, 64)      1792      
_________________________________________________________________
block1_conv2 (Conv2D)        (None, 224, 224, 64)      36928     
_________________________________________________________________
block1_pool (MaxPooling2D)   (None, 112, 112, 64)      0         
_________________________________________________________________
block2_conv1 (Conv2D)        (None, 112, 112, 128)     73856     
_________________________________________________________________
block2_conv2 (Conv2D)        (None, 112, 112, 128)     147584    
_________________________________________________________________
block2_pool (MaxPooling2D)   (None, 56, 56, 128)       0     

### Save image from video
- Load a video localy stored and save three images in different times
- Reshape the images in the input format for the model (224x224)



In [84]:
def video_frames(path_video, number_frames, space_frames, name, folder, size):
    num_images = 0
    label = 0
    count = 0
    success = True
    vidcap = cv2.VideoCapture(path_video)
    fps = int(vidcap.get(cv2.CAP_PROP_FPS))
    success, image = vidcap.read()
    image = cv2.resize(image, size)
    tensor_images = np.zeros([number_frames, image.shape[0],image.shape[1],image.shape[2]])

    while success and num_images < number_frames:
        success, image = vidcap.read()
        image = cv2.resize(image, size)
        tensor_images[num_images, :, :, :] = image
        label += 1
        num_images += 1
        file_name = name + "_" + str(label) + ".jpg"
        path = os.path.join(folder, file_name)
        cv2.imwrite(path, image)
        count += fps*10
        vidcap.set(1, count)
    return tensor_images

In [85]:
path_video = 'videos/l4dSZD3YQ_M.mp4'
size = (224, 224)
folder = 'videos'
name = 'vid_pics'
number_frames = 4
success = True
space_frames = 10
tensor_images = video_frames(path_video, number_frames, space_frames, name, folder, size)

- In this case, I selected four images of a video and created an encoding for this images, for real video fingerprinting, I propose the following approach, create an encoding for each frame and compare against other videos using distance metrics (cosine distance) and after certain threshold that image is considered equal enough.
- Is also possible to use another fully connected layer and check perfomance, also use techniques of dimensionality reduction in order to make more light the storage and processing of the fingerprints.

In [86]:
video_fpprint = model.predict(tensor_images)

In [87]:
video_fpprint.shape

(4, 4096)

In [88]:
video_fpprint

array([[0.        , 2.3146102 , 0.        , ..., 3.040018  , 0.95649546,
        0.        ],
       [0.        , 0.93853045, 0.        , ..., 3.1489646 , 0.65248597,
        0.        ],
       [0.        , 1.0234423 , 0.        , ..., 3.0373263 , 0.5282865 ,
        0.        ],
       [0.        , 1.554539  , 0.        , ..., 3.3475573 , 0.5757041 ,
        0.        ]], dtype=float32)

# Audio fingerprint

- Most of the functions use are taking from the following repo : https://github.com/notexactlyawe/abracadabra
- And as reference the following paper: Avery Li-Chun Wang, An Industrial-Strength Audio Search Algorithm

### Calculate the spectogram from file

In [3]:
def my_spectrogram(audio, sample_rate):
    nperseg = int(sample_rate / 10)
    return spectrogram(audio, sample_rate, nperseg=nperseg)

def lr_file_to_spectrogram(filename):
    audio, rate = lr.load(filename, sr=11025, mono=True)
    return my_spectrogram(audio, 11025)

### Find peaks in the spectogram and hash values

In [4]:
def neighbours_within(center, distance, shape):
    xs = []
    ys = []
    for x in range(-distance, distance+1):
        y_min = -(distance - abs(x))
        y_max = distance - abs(x)
        for y in range(y_min, y_max+1):
            if x == 0 and y == 0:
                continue
            new_x = center[1] + x
            new_y = center[0] + y
            if new_x < 0 or new_x >= shape[1]:
                continue
            if new_y < 0 or new_y >= shape[0]:
                continue
            ys.append(center[0] + y)
            xs.append(center[1] + x)
    return ys, xs

def find_peaks_fast(arr, distance, point_efficiency=0.4):
    i = arr.argsort(axis=None)[::-1]
    j = np.vstack(np.unravel_index(i, arr.shape)).T
    peaks = []
    total = j.size
    e_peaks = (total / (distance**2)) * point_efficiency
    for point in j:
        if len(peaks) > e_peaks:
            break
        for peak in peaks:
            if abs(point[0] - peak[0]) + abs(point[1] - peak[1]) < distance:
                break
        else:
            peaks.append(point)
    print()
    return peaks

def target_zone(anchor, points, width, height, t):
    x_min = anchor[1] + t
    x_max = x_min + width
    y_min = anchor[0] - (height*0.5)
    y_max = y_min + height
    for point in points:
        if point[0] < y_min or point[0] > y_max:
            continue
        if point[1] < x_min or point[1] > x_max:
            continue
        yield point
        
def hash_point_pair(p1, p2):
    return hash((p1[0], p2[0], p2[1]-p2[1]))

def hash_points(points, filename):
    hashes = []
    for anchor in points:
        for target in target_zone(anchor, points, 1, 2000, 0.1):
            hashes.append((hash_point_pair(anchor, target), anchor[1], filename))
    return hashes

def indices_to_tf_pairs(idxs, t, f):
    return np.array([(f[i[0]], t[i[1]]) for i in idxs])


### Calulate the hash for a file

In [5]:
def register_song(filename, distance=20):
    f, t, Sxx = lr_file_to_spectrogram(filename)
    peaks = find_peaks_fast(Sxx, distance)
    peaks = indices_to_tf_pairs(peaks, t, f)
    h = hash_points(peaks, filename)
    return h

In [38]:
def find_matches(test_hashes, hash_db):
    results = defaultdict(list)
    for h in test_hashes:
        if str(h[0]) in hash_db.columns:
            filename = hash_db[str(h[0])][0]
            offset   = float(hash_db[str(h[0])][1])
            results[filename].append(offset)
    return results

In [125]:
def find_matches(test_hashes, hash_db):
    results = defaultdict(list)
    for h in test_hashes.iterrows():
        h = h[1]
        if str(h[0]) in hash_db.columns:
            filename = hash_db[str(h[0])][0]
            offset   = float(hash_db[str(h[0])][1])
            results[filename].append(offset)
    return results

In [145]:
def find_song(matches):
    f_len = []
    for f, m in matches.items():
        if len(m) < 10:
            continue

        f_len.append((f, len(m)/(np.std(m) + np.max(m)), len(m)))
    f_len = sorted(f_len, key=itemgetter(1), reverse=True)
    if (f_len[0][1]>10) and (len(f_len))>0:
        return f_len[0][0]
    else:
        return 'No match'

### Create database of songs

In [10]:
hash_db = {}
data = ["songs/metallica_the_unforgiven.wav"
     , "songs/bruno_mars.wav"
     , "songs/one_republic_counting_stars.wav"
    , "songs/michael_beat_it.wav"]
for file in data:
    hashes = register_song(file)
    filename = file.split('/')[-1]
    for h, offset, _ in hashes:
        hash_db[h] = filename, offset

files = defaultdict(int)
for h, f in hash_db.items():
    files[f[0]] += 1
pd.DataFrame(hash_db).to_csv('music_database.csv')







### Create hash of test songs

In [154]:
## don't run on git because the songs are to heavy for upload
test_song_no_match = register_song("songs/coldplay_the_scientist.wav")
test_song_match = register_song("songs/one_republic_counting_stars.wav")





In [155]:
pd.DataFrame(test_song_no_match).to_csv('coldplay_the_scientist.csv')
pd.DataFrame(test_song_match).to_csv('one_republic_counting_stars.csv')

In [156]:
coldplay = pd.read_csv('coldplay_the_scientist.csv').drop(columns='Unnamed: 0')
one_republic = pd.read_csv('one_republic_counting_stars.csv').drop(columns='Unnamed: 0')

### Load database

In [28]:
music_db = pd.read_csv('music_database.csv').drop(columns='Unnamed: 0')

### Find matches

For the porpuse of testing i loaded two songs, one that is in the database and another that is not.
- The scientist by coldplay is not in the database and counting stars by one republic is.

In [157]:
coldplay_matches = find_matches(coldplay, music_db)
one_republic_matches = find_matches(one_republic, music_db)

In [158]:
find_song(coldplay_matches)

'No match'

In [159]:
find_song(one_republic_matches)

'one_republic_counting_stars.wav'