In [None]:

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from moviepy.editor import *
from tqdm import tqdm
from collections import Counter

from pydub import AudioSegment
from pydub.silence import detect_nonsilent

import tensorflow as tf
import tensorflow_hub as hub
import tensorflow_io as tfio
import numpy as np

from sklearn.cluster import KMeans
from sklearn.decomposition import PCA
import matplotlib.pyplot as plt

from sklearn.metrics import silhouette_score

import joblib


pd.set_option('display.max_rows', 500)
pd.set_option("display.max_columns", 500)
pd.set_option('display.float_format', lambda x: '%.6f' % x)


In [None]:

AUDIO_OUTPUT_PATH = 'processed_video/'

MUSIC_FEATURE_BEFORE_CLUSTER = 'processed_feature/music.csv'
MUSIC_FEATURE = 'processed_feature/music_with_cluster.csv'

KMEANS_MODEL = "cluster_model/kmeans_model.pkl"
YAMNET_MODEL = '/Users/yujizhao/Projects/analysis/video/pretrained_models/yamnet'


## Handcrafted Audio Feature 

In [None]:
# Zero-Crossing Rate，ZCR
# 是指一个信号的符号变化的比率，例如信号从正数变成负数或反向。具体来说，单位时间内信号通过零值的次数就称为过零率。
# 这个特征在电磁学领域得到广泛使用，常用于分析一段电流。对于连续语音信号，过零率可以考查其时域波形通过时间轴的情况。
# 对于离散信号，实质上就是信号采样点符号变化的次数。在一定程度上，过零率可以反映出频率的信息，比如正弦信号的平均过零率就是信号的频率除以两倍采样频率。
# 浊音平均过零率低,集中在低频端; 轻音过零率高,集中在高频端 

# Zero-Crossing Rate (ZCR): The rate at which the audio signal changes sign. It's useful for detecting the presence of noise or percussive sounds.
# Amplitude Envelope: The smooth curve outlining the extremes of the audio waveform. It can represent the dynamics of the signal.
# envelope = librosa.onset.onset_strength(y)
# Energy: The sum of squares of the signal amplitude, representing the loudness of the audio.


def detect_sounds(audio_file, silence_thresh=-40, min_silence_len=500):
    audio = AudioSegment.from_file(audio_file)
    # Detect non-silent periods
    nonsilent_ranges = detect_nonsilent(audio, min_silence_len=min_silence_len, silence_thresh=silence_thresh)
    return audio, nonsilent_ranges

def calculate_non_silent_duration(nonsilent_ranges):
    # Calculate the total duration of non-silent periods in milliseconds
    total_non_silent_duration = sum(end_ms - start_ms for start_ms, end_ms in nonsilent_ranges)
    return total_non_silent_duration / 1000  


def sqrt_energy(x):
    squared_sum = np.square(x).sum()
    energy = squared_sum / len(x)
    rms_energy = np.sqrt(energy)
    return rms_energy

def create_folder(directory):    
    if not os.path.exists(directory):
        os.makedirs(directory, exist_ok=True)
    
# Define frequency bands (in Hz)
bands = {
    'sub-bass': (20, 60),
    'bass': (60, 250),
    'low-midrange': (250, 500),
    'midrange': (500, 2000),
    'upper-midrange': (2000, 4000),
    'presence': (4000, 6000),
    'brilliance': (6000, 20000)
}

# Convert frequency bands to bins
def freq_to_bin(frequency, sr, n_fft):
    return int(frequency * n_fft / sr)

def classify_bpm_detailed(bpm):
    if bpm <= 40:
        return 'grave' # 'Grave (very slow)'
    elif bpm <= 60:
        return 'largo'# 'Largo (broadly)'
    elif bpm <= 66:
        return  'larghetto' # 'Larghetto (rather broadly)'
    elif bpm <= 76:
        return 'adagio' # 'Adagio (slow and stately)'
    elif bpm <= 108:
        return 'andante' # 'Andante (at a walking pace)'
    elif bpm <= 120:
        return 'moderato' # 'Moderato (moderately)'
    elif bpm <= 156:
        return 'allegro' # 'Allegro (fast, quickly and bright)'
    elif bpm <= 176:
        return 'vivace' # 'Vivace (lively and fast)'
    else:
        return 'presto' # 'Presto (extremely fast)'


In [None]:

music_features = pd.DataFrame()
n_fft = 2048
# separator = Separator('spleeter:2stems') ## Separator does not work properly so not used in final features 
for x in tqdm(final_dataset[idx].values):
    file_path = video_path+'/'+x+'.mp4'
    video = VideoFileClip(file_path)
    output_directory = AUDIO_OUTPUT_PATH + x 
    create_folder(output_directory)
    cur_dict = {'creative_code': x} 
    audio_file = output_directory + '/origin_audio.wav'

    if not os.path.exists(audio_file):
        audio = video.audio 
        audio.write_audiofile(audio_file, logger=None)

    #     # Separate the audio file to accompaniment and vocals 
    #     # separator.separate_to_file(audio_file, output_directory)
    #     separate_to_file_with_timeout(separator, audio_file, output_directory, timeout_seconds)

    wave_data, sr = librosa.load(audio_file)
    
    cur_dict['zcr'] = sum(librosa.zero_crossings(y=wave_data, pad=False)) / len(wave_data)  
    rms = librosa.feature.rms(y=wave_data)[0]
    cur_dict['rms_average'] = np.sum(rms) / len(rms)  # 求全部帧的短时能量均值
    cur_dict['rms_diff'] = np.max(rms) - np.min(rms)
    cur_dict['rms_std'] = np.std(rms)  
    cur_dict['rms_high'] = np.quantile(rms, 0.75)  
    cur_dict['rms_low'] = np.quantile(rms, 0.25) 
    cur_dict['energy'] = sqrt_energy(rms)

    stft = np.abs(librosa.stft(wave_data))
    freq_bins = {band: (freq_to_bin(low, sr, n_fft), freq_to_bin(high, sr, n_fft)) for band, (low, high) in bands.items()}

    # Compute the energy in each frequency band
    band_energies = {}
    for band, (low_bin, high_bin) in freq_bins.items():
        band_energies[band] = np.sum(stft[low_bin:high_bin, :])
    # Normalize by the total energy
    total_energy = np.sum(stft)
    normalized_band_energies = {band: energy / total_energy for band, energy in band_energies.items()}
    cur_dict['frequency_bands_sub_bass'] = normalized_band_energies['sub-bass']
    cur_dict['frequency_bands_bass'] = normalized_band_energies['bass']
    cur_dict['frequency_bands_low_midrange'] = normalized_band_energies['low-midrange']
    cur_dict['frequency_bands_midrange'] = normalized_band_energies['midrange']
    cur_dict['frequency_bands_upper_midrange'] = normalized_band_energies['upper-midrange']
    cur_dict['frequency_bands_presence'] = normalized_band_energies['presence']
    cur_dict['frequency_bands_brilliance'] = normalized_band_energies['brilliance']

    n_mfcc = 100
    mfccs = np.mean(librosa.feature.mfcc(y=wave_data, sr=sr, n_mfcc=n_mfcc).T,axis=0)
    np.save(os.path.join(AUDIO_OUTPUT_PATH, x)+'/audio_mfcc100.npy', mfccs)

    # tempo 
    cur_dict['overall_tempo'] = librosa.feature.tempo(y=wave_data, sr=sr)[0]
    
    tempo, beat_frames = librosa.beat.beat_track(y=wave_data, sr=sr)
    beat_times = librosa.frames_to_time(beat_frames, sr=sr)
    bpm = 60 / np.diff(beat_times)  # Calculate BPM from inter-beat intervals
    bpm_categories = [classify_bpm_detailed(b) for b in bpm]
    bpm_distribution = Counter(bpm_categories)
    total_bpm = sum(bpm_distribution.values())
    bpm_ratio = {category: count / total_bpm for category, count in bpm_distribution.items()}
    for cat in ['grave', 'largo', 'larghetto','adagio','andante','moderato','allegro','vivace','presto']: 
        cur_dict['beats_'+cat] = bpm_ratio.get(cat, 0.0)
    
    cur_df = pd.DataFrame(cur_dict, index=[0])
    music_features = pd.concat([music_features, cur_df], axis=0)
    if len(music_features) % 10 == 0:
        music_features.to_csv(MUSIC_FEATURE_BEFORE_CLUSTER,index=False) 
music_features.to_csv(MUSIC_FEATURE_BEFORE_CLUSTER,index=False) 


In [None]:
music_features.to_csv(MUSIC_FEATURE_BEFORE_CLUSTER,index=False) 


## Audio Cluster Feature

In [None]:

def load_wav_16k_mono(filename):
    """ Load a WAV file, convert it to a float tensor, resample to 16 kHz single-channel audio. """
    file_contents = tf.io.read_file(filename)
    wav, sample_rate = tf.audio.decode_wav(
          file_contents,
          desired_channels=1)
    wav = tf.squeeze(wav, axis=-1)
    sample_rate = tf.cast(sample_rate, dtype=tf.int64)
    wav = tfio.audio.resample(wav, rate_in=sample_rate, rate_out=16000)
    return wav

# Function to extract YAMNet embeddings from audio file
def extract_yamnet_embeddings(model, audio_file):
    wav_data = load_wav_16k_mono(audio_file)
    scores, embeddings, spectrogram = model(wav_data)
    return tf.expand_dims(embeddings, axis=-1).numpy()

# Function to compute average embedding
def compute_avg_embedding(embeddings):
    avg_embedding = tf.reduce_mean(embeddings, axis=0, keepdims=True)
    return np.reshape(avg_embedding, -1)

# Function to perform K-means clustering on embeddings
def cluster_audio_segments(embeddings, num_clusters, joblib_file):
    kmeans = KMeans(n_clusters=num_clusters)
    cluster_model = kmeans.fit(embeddings)
        
    # Save the model to a file
    joblib.dump(cluster_model, joblib_file)
    print(f"Model saved to {joblib_file}")
    
    clusters = kmeans.fit_predict(embeddings)
    return clusters


# Function to visualize clusters using PCA
def visualize_clusters(embeddings, clusters, num_clusters, video_ids=None):
    pca = PCA(n_components=2)
    reduced_embeddings = pca.fit_transform(embeddings)
    
    plt.figure(figsize=(10, 8))
    for cluster_id in range(num_clusters):
        cluster_points = reduced_embeddings[clusters == cluster_id]
        plt.scatter(cluster_points[:, 0], cluster_points[:, 1], label=f'Cluster {cluster_id}')
        if video_ids is not None:
            for i, point in enumerate(cluster_points):
                plt.annotate(video_ids[i], (point[0], point[1]), textcoords="offset points", xytext=(0,10), ha='center')
    
    plt.title('YAMNet Audio Clustering')
    plt.xlabel('PCA Component 1')
    plt.ylabel('PCA Component 2')
    plt.legend()
    plt.grid(True)
    plt.show()


In [None]:
try: 
    # Load the model from the file
    loaded_kmeans = joblib.load(KMEANS_MODEL)
    print(f"Model loaded from {KMEANS_MODEL}")
except:
    ## RUN K MEANS 
    
    # Load YAMNet model from TensorFlow Hub
    # YAMNET_URL = "https://tfhub.dev/google/yamnet/1"
    model = hub.load(YAMNET_MODEL)
    audio_dirs = os.listdir(AUDIO_OUTPUT_PATH)  

    # Extract YAMNet embeddings for each audio file
    embeddings_list = []
    video_ids_list = []

    for x in audio_dirs:
        if x == '.DS_Store': continue
        audio_file = AUDIO_OUTPUT_PATH + os.path.join(x, 'origin_audio.wav')  
        embeddings = extract_yamnet_embeddings(model, audio_file)
        embeddings_list.append(compute_avg_embedding(embeddings))
        video_ids_list.append(x)

    all_embeddings = np.vstack(embeddings_list)
    video_ids = np.array(video_ids_list)

    print(all_embeddings.shape)
    unique_rows, unique_indices = np.unique(all_embeddings, axis=0, return_index=True)
    unique_video_ids = video_ids[unique_indices]
    print(unique_rows.shape)

    # Generate synthetic dataset
    X = unique_rows 
    MAX_K = 20
    # Elbow Method
    wcss = []
    for k in range(1, MAX_K+1):
        kmeans = KMeans(n_clusters=k, random_state=42)
        kmeans.fit(X)
        wcss.append(kmeans.inertia_)

    plt.plot(range(1, MAX_K+1), wcss, 'bx-')
    plt.xlabel('Number of clusters (k)')
    plt.ylabel('WCSS')
    plt.title('Elbow Method for Optimal k')
    plt.show()

    # Silhouette Method
    silhouette_scores = []
    for k in range(2, MAX_K+1):
        kmeans = KMeans(n_clusters=k, random_state=42)
        kmeans.fit(X)
        score = silhouette_score(X, kmeans.labels_)
        silhouette_scores.append(score)

    plt.plot(range(2, MAX_K+1), silhouette_scores, 'bx-')
    plt.xlabel('Number of clusters (k)')
    plt.ylabel('Silhouette Score')
    plt.title('Silhouette Method for Optimal k')
    plt.show()

    BEST_K = 6

    clusters = cluster_audio_segments(unique_rows, BEST_K, KMEANS_MODEL)
    # Visualize clusters
    visualize_clusters(unique_rows, clusters, BEST_K, unique_video_ids)


In [None]:
model = hub.load(YAMNET_MODEL)
cluster_feature = []
for vid in music_features.index:
    audio_file = AUDIO_OUTPUT_PATH + os.path.join(vid, 'origin_audio.wav')  
    embeddings = extract_yamnet_embeddings(model, audio_file) 
    pool_embeddings = np.array(compute_avg_embedding(embeddings)).reshape(1, -1) 
    np.save(os.path.join(AUDIO_OUTPUT_PATH, vid)+'/audio_vec.npy', pool_embeddings)
    prediction = loaded_kmeans.predict(pool_embeddings)
    cluster_feature.append(str(prediction[0]))
music_features_set2 = music_features.copy()
music_features_set2['audio_cluster'] = cluster_feature
print(music_features_set2.groupby('audio_cluster')['audio_cluster'].count())
music_features_set2 = pd.concat([music_features, pd.get_dummies(music_features_set2[['audio_cluster']].fillna('NaN').applymap(
                                        lambda x: str(x).lower() if isinstance(x, str) else x
                                        ), dtype=int)], axis=1)
music_features = music_features_set2.copy()
music_features.to_csv(MUSIC_FEATURE, index=True) 
