### Загрузка аудиофайла

In [40]:
import librosa
import numpy as np
import matplotlib.pyplot as plt
from typing import List, Tuple
from scipy.ndimage import maximum_filter, generate_binary_structure, iterate_structure, binary_erosion
from matplotlib import mlab
from operator import itemgetter
import hashlib
import os
import pandas as pd
from typing import List, Tuple, Dict, Union

In [45]:
DEFAULT_FS = 22050
DEFAULT_WINDOW_SIZE = 1024
DEFAULT_OVERLAP_RATIO = 0.5
DEFAULT_FAN_VALUE = 15
DEFAULT_AMP_MIN = 10
CONNECTIVITY_MASK = 2
PEAK_NEIGHBORHOOD_SIZE = 20

PEAK_SORT = True  # Установите значение в True, если требуется сортировка пиков по времени
MIN_HASH_TIME_DELTA = 0  # Минимальное время между хэшами
MAX_HASH_TIME_DELTA = 200  # Максимальное время между хэшами
FINGERPRINT_REDUCTION = 20  # Значение для сокращения хэшей

def create_fingerprint(file_path: str, duration: int = 20, fan_value: int = DEFAULT_FAN_VALUE) -> List[Tuple[str, int]]:
    """
    Extract audio features and generate hashes for fingerprinting.

    :param file_path: path to the audio file.
    :param duration: duration of audio to consider (in seconds).
    :param fan_value: degree to which a fingerprint can be paired with its neighbors.
    :return: a list of hashes with their corresponding offsets.
    """
    audio_features = extract_audio_features(file_path, duration)
    audio_hashes = generate_hashes(audio_features, fan_value)
    return audio_hashes

def extract_audio_features(file_path: str, duration: int = 20) -> List[Tuple[int, int]]:
    """
    Extract audio features similar to the fingerprinting function.

    :param file_path: path to the audio file.
    :param duration: duration of audio to consider (in seconds).
    :return: list of tuples containing frequency and time components of the audio features.
    """
    signal, sr = librosa.load(file_path, sr=DEFAULT_FS, duration=duration)

    # Calculate spectrogram
    stft = librosa.stft(signal)

    # Convert spectrogram to amplitude (magnitude)
    amplitude = np.abs(stft)

    # Find local maxima using maximum filter
    local_maxima = find_local_maxima(amplitude)

    # Convert indices to frequency and time values
    freqs, times = librosa.core.magphase(stft)[1], librosa.core.frames_to_time(np.arange(amplitude.shape[1]))

    # Get frequency and time values for local maxima
    max_freqs = freqs[local_maxima[0]]
    max_times = times[local_maxima[1]]

    # Return as list of tuples
    return list(zip(max_freqs, max_times))

def find_local_maxima(arr: np.ndarray, amp_min: int = DEFAULT_AMP_MIN) -> Tuple[np.ndarray, np.ndarray]:
    """
    Find local maxima in the given array.

    :param arr: input array.
    :param amp_min: minimum amplitude to be considered a peak.
    :return: indices of local maxima.
    """
    struct = generate_binary_structure(2, CONNECTIVITY_MASK)
    neighborhood = iterate_structure(struct, PEAK_NEIGHBORHOOD_SIZE)

    # Find local maxima using maximum filter
    local_max = maximum_filter(arr, footprint=neighborhood) == arr

    # Apply erosion
    background = (arr == 0)
    eroded_background = binary_erosion(background, structure=neighborhood, border_value=1)

    # Boolean mask of array with True at peaks
    detected_peaks = local_max != eroded_background

    # Extract peaks with amplitude above amp_min
    amps = arr[detected_peaks].flatten()
    filter_idxs = np.where(amps > amp_min)
    peaks_row, peaks_col = np.where(detected_peaks)

    return peaks_row[filter_idxs], peaks_col[filter_idxs]

def generate_hashes(peaks: List[Tuple[int, int]], fan_value: int = DEFAULT_FAN_VALUE) -> List[Tuple[str, int]]:
    """
    Generate hashes for fingerprinting.

    :param peaks: list of peak frequencies and times.
    :param fan_value: degree to which a fingerprint can be paired with its neighbors.
    :return: a list of hashes with their corresponding offsets.
    """
    # Sorting peaks by time, if required
    if PEAK_SORT:
        peaks.sort(key=itemgetter(1))

    hashes = []
    for i in range(len(peaks)):
        for j in range(1, fan_value):
            if (i + j) < len(peaks):
                freq1 = peaks[i][0]  # Frequencies are at index 0
                freq2 = peaks[i + j][0]
                t1 = peaks[i][1]  # Times are at index 1
                t2 = peaks[i + j][1]
                t_delta = t2 - t1

                if MIN_HASH_TIME_DELTA <= t_delta <= MAX_HASH_TIME_DELTA:
                    # Creating a unique hash for the pair of frequencies and time delta
                    h = hashlib.sha1(f"{str(freq1)}|{str(freq2)}|{str(t_delta)}".encode('utf-8'))

                    # Appending the hash and its corresponding time offset to the list of hashes
                    hashes.append((h.hexdigest()[0:FINGERPRINT_REDUCTION], t1))

    return hashes

# Пример использования:
file_path = 'test_data/Lady-Gaga-bad-romance.mp3'
audio_hashes = create_fingerprint(file_path)
print("Audio Hashes:", audio_hashes)

Audio Hashes: [('ee14804b72bdca1f620d', 0.301859410430839), ('c39bd5c187caeff06048', 0.301859410430839), ('0c653ad1ac8bd262a4d7', 0.301859410430839), ('9a22c3ee7f7546cac127', 0.301859410430839), ('4cbccd7ce6c25d71dd58', 0.301859410430839), ('3ba31ea2bc7062061548', 0.301859410430839), ('6e5648044f1d99d21094', 0.301859410430839), ('ce70169ea88ceebc08c5', 0.301859410430839), ('be59605126695bb48209', 0.301859410430839), ('44da34506b0c81b78b94', 0.301859410430839), ('b8d98dcaefef4c6650c0', 0.301859410430839), ('4a883af87a522fafb598', 0.301859410430839), ('37f0909295892945bb1a', 0.301859410430839), ('9f65162253b6d2938512', 0.301859410430839), ('dcf9b0def4aa73c393af', 0.6733786848072563), ('d7362bb04a5847a98fc0', 0.6733786848072563), ('b3b8a877cc4e69b7d4dd', 0.6733786848072563), ('7e7e633ab9084d739905', 0.6733786848072563), ('9166fcba68fb79f169c2', 0.6733786848072563), ('a8b590420f2c81489640', 0.6733786848072563), ('d940c35306069c6b8f52', 0.6733786848072563), ('89e0175086630136cb30', 0.673378

In [22]:
# Функция для обработки всех файлов в директории
def process_directory(directory_path):
    audio_files = [file for file in os.listdir(directory_path) if file.endswith('.mp3')]
    data = []
    for audio_file in audio_files:
        file_path = os.path.join(directory_path, audio_file)
        audio_hashes = create_fingerprint(file_path)
        for hash_data in audio_hashes:
            data.append({'File': audio_file, 'Hash': hash_data[0], 'Offset': hash_data[1]})
        print('processed:', audio_file)
    return data

# Путь к директории с аудиофайлами
directory_path = 'songs'

# Обработка всех файлов в директории и создание DataFrame
processed_data = process_directory(directory_path)
df = pd.DataFrame(processed_data)

# Вывод первых нескольких строк DataFrame
print(df.head())

# Сохранение DataFrame в файл CSV
df.to_csv('audio_hashes.csv', index=False)


processed: 7 Seconds -- Youssou N'Dour, Neneh Cherry.mp3
processed: a-ha -- Take On Me.mp3
processed: ABBA -- Money, Money, Money.mp3
processed: ABBA -- The Winner Takes It All.mp3
processed: All The Things She Said.mp3
processed: Animals — Martin Garrix.mp3
processed: Another One Bites The Dust — Queen.mp3
processed: Apologize.mp3
processed: Appletree.mp3
processed: B.o.B, Hayley Williams of Paramore -- Airplanes (feat. Hayley Williams of Paramore).mp3
processed: B.o.B, Jessie J -- Price Tag.mp3
processed: Bad Bad Boys.mp3
processed: Bad Romance.mp3
processed: Bag Raiders -- Shooting stars.mp3
                                            File                  Hash  \
0  7 Seconds -- Youssou N'Dour, Neneh Cherry.mp3  b55fee60edd7facdd94a   
1  7 Seconds -- Youssou N'Dour, Neneh Cherry.mp3  88af9cb98ba70fc52161   
2  7 Seconds -- Youssou N'Dour, Neneh Cherry.mp3  416cd274fdd077fc9bf5   
3  7 Seconds -- Youssou N'Dour, Neneh Cherry.mp3  ae613eb987fb7224deca   
4  7 Seconds -- Youssou N'Do

In [35]:
def match_hashes(hashes: List[Tuple[str, int]], database: Dict[str, List[Tuple[str, int]]], batch_size: int = 1000) -> Tuple[List[Tuple[str, str, int]], Dict[str, int]]:
    """
    Поиск совпадений хэшей в базе данных.

    :param hashes: список хэшей смещений для поиска.
    :param database: словарь базы данных хэшей смещений.
    :param batch_size: размер пакета для выполнения запросов.
    :return: кортеж с совпадающими парами (хэш, идентификатор_песни, разница_смещений) и словарем с количеством совпадений для каждой песни.
    """
    # Создаем словарь для хранения количества совпадений для каждой песни
    dedup_hashes = {}

    # Список для хранения совпадающих хэшей
    matched_results = []

    for hsh, offset in hashes:
        # Проверяем, есть ли данный хэш в базе данных
        if hsh in database:
            # Перебираем все записи в базе данных для данного хэша
            for sid, db_offset in database[hsh]:
                # Вычисляем разницу смещений
                offset_difference = db_offset - offset
                # Добавляем совпадение в список результатов
                matched_results.append((hsh, sid, offset_difference))
                # Увеличиваем счетчик совпадений для данной песни
                if sid in dedup_hashes:
                    dedup_hashes[sid] += 1
                else:
                    dedup_hashes[sid] = 1

    return matched_results, dedup_hashes


database = {}
for _, row in df.iterrows():
    hsh = row['Hash']
    sid = row['File']
    offset = row['Offset']
    if hsh in database:
        database[hsh].append((sid, offset))
    else:
        database[hsh] = [(sid, offset)]

# Вызов функции сопоставления
matched_results, dedup_hashes = match_hashes(audio_hashes, database)

# Интерпретация результатов
print("Совпадения хэшей:")
for hsh, sid, offset_difference in matched_results:
    print(f"Хэш: {hsh}, Аудиозапись: {sid}, Разница смещений: {offset_difference}")

print("\nКоличество совпадений для каждой аудиозаписи:")
for sid, count in dedup_hashes.items():
    print(f"Аудиозапись: {sid}, Количество совпадений: {count}")

Совпадения хэшей:

Количество совпадений для каждой аудиозаписи:


In [43]:
import sounddevice as sd
PEAK_NEIGHBORHOOD_SIZE = (3, 3)  # Размер структуры фильтра 3x3


def extract_audio_features(audio_data_or_file: Union[str, np.ndarray], fs: int, duration: int = 20) -> List[Tuple[int, int]]:
    """
    Extract audio features similar to the fingerprinting function.

    :param audio_data_or_file: audio data as a numpy array or path to the audio file.
    :param fs: sampling rate of the audio data.
    :param duration: duration of audio to consider (in seconds).
    :return: list of tuples containing frequency and time components of the audio features.
    """
    if isinstance(audio_data_or_file, str):
        signal, sr = librosa.load(audio_data_or_file, sr=fs, duration=duration)
    elif isinstance(audio_data_or_file, np.ndarray):
        signal = audio_data_or_file
        sr = fs
    else:
        raise ValueError("Unsupported input type. Please provide either a path to an audio file or audio data as a numpy array.")

    # Calculate spectrogram
    stft = librosa.stft(signal)

    # Convert spectrogram to amplitude (magnitude)
    amplitude = np.abs(stft)

    # Find local maxima using maximum filter
    local_maxima = find_local_maxima(amplitude)

    # Convert indices to frequency and time values
    freqs, times = librosa.core.magphase(stft)[1], librosa.core.frames_to_time(np.arange(amplitude.shape[1]))

    # Get frequency and time values for local maxima
    max_freqs = freqs[local_maxima[0]]
    max_times = times[local_maxima[1]]

    # Return as list of tuples
    return list(zip(max_freqs, max_times))



def analyze_microphone(duration: int = 20):
    """
    Анализирует аудиосигнал с микрофона и ищет совпадения в базе данных хэшей.

    :param duration: длительность анализа в секундах.
    """
    print("Начинается анализ микрофона...")
    fs = 22050  # Частота дискретизации
    audio_input = sd.rec(int(fs * duration), samplerate=fs, channels=1, dtype='float32')
    sd.wait()

    # Преобразование аудиосигнала в хэши
    audio_features = extract_audio_features(audio_input, fs)

    audio_hashes = generate_hashes(audio_features)

    # Сопоставление хэшей с базой данных
    matched_results, dedup_hashes = match_hashes(audio_hashes, database)

    # Вывод результатов
    print("Совпадения хэшей:")
    for hsh, sid, offset_difference in matched_results:
        print(f"Хэш: {hsh}, Аудиозапись: {sid}, Разница смещений: {offset_difference}")

    print("\nКоличество совпадений для каждой аудиозаписи:")
    for sid, count in dedup_hashes.items():
        print(f"Аудиозапись: {sid}, Количество совпадений: {count}")

# Пример использования
analyze_microphone()


Начинается анализ микрофона...


TypeError: '<' not supported between instances of 'tuple' and 'int'