# Imports

In [37]:
from google.colab import drive
drive.mount('/content/drive')
!pip install pytubefix # For downloading from YouTube to mp3
!pip install essentia # For musical analysis
from essentia.standard import MusicExtractor
!apt-get install ffmpeg # For converting sound file formats
import os # For conversion and file handling
from pytubefix import YouTube
from pytubefix.cli import on_progress
!pip install yt-dlp # For Youtube search by artist/song
import json
import pandas as pd
import tensorflow as tf
import numpy as np
import tensorflow as tf
import librosa
import os
import time
import re
import chardet
import subprocess

gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        print("✅ GPU memory growth enabled")
    except RuntimeError as e:
        print(e)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
ffmpeg is already the newest version (7:4.4.2-0ubuntu0.22.04.1).
0 upgraded, 0 newly installed, 0 to remove and 20 not upgraded.
✅ GPU memory growth enabled


# Input Definition

In [45]:
# File to store features for all songs analyzed
cache_file = '/content/drive/My Drive/Music and War/Cache/song_features_cache.csv'

feature_list = ['key_scale', 'dynamic_complexity', 'happy', 'non_happy', 'arousal', 'valence', 'danceability',
                'aggressive', 'non_aggressive', 'sad', 'non_sad', 'relaxed', 'non_relaxed', 'party', 'non_party']  # All the features we're going to analyze
sp_results = {feature: '-7777' for feature in feature_list}  # Init a variable to hold song features.

# Load cache if it exists
if os.path.exists(cache_file):
    cached_songs = pd.read_csv(cache_file, encoding='utf-8-sig', index_col=0)
    print('Found existing cache.')
else:
    cached_songs = pd.DataFrame(columns=['song_name', 'artist_name'] + feature_list)
    print('No cache found, creating a new cache.')

Found existing cache.


# Definitions

## search_and_download
Download a song from YouTube by song name and artist name

In [3]:
def search_and_download(index, this_song):
    #start_time = time.time()
    # Search and get the first result
    query = f"{this_song['artist_name']} {this_song['song_name']}"
    search_query = f"ytsearch:{query}"  # yt-dlp search format

    # Define file names
    safe_artist = this_song['artist_name'].replace(" ", "_").replace("/", "-").replace("'", "").replace('"', '').replace("¥","").replace("$","")
    safe_song = this_song['song_name'].replace(" ", "_").replace("/", "-").replace("'", "").replace('"', '').replace("¥","").replace("$","")
    input_file = f"{index}_{safe_artist}_{safe_song}.m4a"
    output_file = f"{index}_{safe_artist}_{safe_song}.wav"

    # Use yt-dlp to extract metadata and download the audio
    command = f"yt-dlp -f bestaudio --extract-audio --audio-format m4a --print-json --output {input_file} \"{search_query}\""
    result = os.popen(command).read()

    if not result.strip():
        print(f"No results found for query: {query}")
        return 'no_file', None, None

    try:
        metadata = json.loads(result.splitlines()[0])
        video_url = metadata["webpage_url"]
        video_title = metadata["title"]
    except (IndexError, json.JSONDecodeError) as e:
        print(f"Error parsing yt-dlp result: {e}")
        return 'no_file', None, None

    # Convert M4A to WAV
    os.system(f"ffmpeg -i {input_file} {output_file}")
    print(f"Converted {input_file} to {output_file}")

    if os.path.exists(input_file):
        os.remove(input_file)

    # Save metadata
    print(f"Video URL: {video_url}")
    print(f"Video Title: {video_title}")
    #print(f"Time to search and download song: {time.time() - start_time:.4f} seconds")

    return output_file, video_url, video_title

## download_audio
Download a song from youtube based on input url

In [43]:
def download_audio(index, youtube_url, artist_name, song_name):
    # Sanitize file names
    safe_artist = re.sub(r'[^\w\-.]', '_', artist_name).strip('_')
    safe_song = re.sub(r'[^\w\-.]', '_', song_name).strip('_')

    input_file = f"{safe_artist}_{safe_song}.m4a"
    output_file = f"{safe_artist}_{safe_song}.wav"

    # Run yt-dlp and capture JSON output
    command = [
        "yt-dlp", "-f", "bestaudio", "--extract-audio", "--audio-format", "m4a",
        "--print-json", "--output", input_file, youtube_url
    ]

    try:
        result = subprocess.run(command, capture_output=True, text=True)
        json_lines = result.stdout.strip().split("\n")

        # Find the first valid JSON line
        metadata = None
        for line in json_lines:
            try:
                metadata = json.loads(line)
                break  # Stop at first valid JSON
            except json.JSONDecodeError:
                continue  # Skip non-JSON lines

        if not metadata:
            print("Error: No valid JSON metadata found.")
            return 'no_file', None, None

        video_title = metadata.get("title", "Unknown Title")
    except Exception as e:
        print(f"Error processing yt-dlp output: {e}")
        return 'no_file', None, None

    # Convert M4A to WAV
    if os.path.exists(input_file):
        os.system(f"ffmpeg -i \"{input_file}\" \"{output_file}\"")
        os.remove(input_file)
    else:
        print("Download failed: No audio file found.")
        return 'no_file', None, None

    return output_file, youtube_url, video_title

## list_csv_files_in_folder
Return a list of all CSV files in a folder

In [5]:
def list_csv_files_in_folder(folder_path):

    all_files = os.listdir(folder_path) # Filter CSV files
    csv_files = [file for file in all_files if file.endswith('.csv')] # Filter CSV files
    df = pd.DataFrame({'File Name': csv_files}) # Create a DataFrame with file names
    print(f"Found {len(csv_files)} CSV file(s) in the folder.")

    return df, len(csv_files)

## read_csv_file
Read csv file of Spotify data into a dataframe

In [6]:
def read_csv_file(file_name, file_path):

    full_path = f"{file_path}/{file_name}" # Full file path
    df = pd.read_csv(full_path) # Read the CSV file into a DataFrame
    num_rows = len(df)     # Get the number of rows (number of songs)
    print(f"Number of songs found: {num_rows}")

    return df, num_rows

## get_song_details
Get song details
df - dataframe in the form of a Spotify CSV
index - index to a song (row) in the dataframe

In [7]:
def get_song_details(df, index):

    if index < 0 or index >= len(df):
        raise IndexError("Index out of range.")

    # Extract artist name and song name
    artist_name = df.loc[index, 'artist_names']
    song_name = df.loc[index, 'track_name']

    # Return as a dictionary
    return {'artist_name': artist_name, 'song_name': song_name}

## analyze_song
Analyze a song using Essentia basic models (not specialized machine-learning models)

In [8]:
def analyze_song(audio_file):
    # audio_file - file name of .wav file stored locally. This file will be loaded and analyzed
    extractor = MusicExtractor()
    features, _ = extractor(audio_file)

    results = {
        'danceability': features['rhythm.danceability'],
        'key_scale': features['tonal.key_temperley.scale'],
        'dynamic_complexity': features['lowlevel.dynamic_complexity']
    }

    return results

## add_columns
Add feature columns to the dataframe, such as 'danceability'...

In [9]:
def add_columns(df, feature_list):

    for feature in feature_list:
        if feature not in df.columns:
            df[feature] = None  # Initialize new columns with None or NaN

    return df

## update_chart
Update the chart with input values of features. These features were already extracted by a separate function.

In [10]:
def update_chart(this_chart, song_ind, song_features):
# song_features: dictionary containing song features and values.

    for field, value in song_features.items(): # Iterate over all features in the dictionary
        if field in this_chart.columns:  # Ensure the field exists in the DataFrame
            this_chart.at[song_ind, field] = value  # Assign value to the correct row and column
            #print('assigned value')

    return this_chart

## update_results_missing
Update sp_results with -9999 for all fields, indicating a missing song

In [11]:
def update_results_missing(sp_results):
    for feature in feature_list:
        sp_results[feature] = '-9999'
    return sp_results

## get_song_features_from_cache
Look for a song in the cache. If found, retreive the features

In [12]:
def get_song_features_from_cache(song_id, cached_songs, feature_list):
    """
    Searches for a song in the cache and returns its features if found.

    Parameters:
    - song_id (str): The unique identifier for the song (e.g., "song_name - artist_name").
    - cached_songs (DataFrame): The DataFrame containing the cached song data.
    - feature_list (list): The list of features you want to retrieve for the song.

    Returns:
    - dict: A dictionary of song features if found, otherwise None.
    """
    if song_id in cached_songs.index:  # Check if song is in cache
        print(f"Retrieving cached features for {song_id}")
        song_row = cached_songs.loc[song_id]  # Get the row for the song

        # Create a dictionary of the song's features from the cache
        sp_results = {feature: song_row[feature] for feature in feature_list}

        return sp_results  # Return the features as a dictionary
    else:
        print(f"Song {song_id} not found in cache.")
        return None  # Return None if the song is not found in the cache

## load_model
Load a model from a pb file


In [13]:
def load_model(pb_file):
    """
    Load a TensorFlow frozen graph from a .pb file.
    """
    with tf.io.gfile.GFile(pb_file, "rb") as f:
        graph_def = tf.compat.v1.GraphDef()
        graph_def.ParseFromString(f.read())

    return graph_def

## analyze_with_pb
Analyze a song with a pb model

In [14]:
def analyze_with_pb(vggish_pb, mood_pb, embeddings):
    """
    Analyze audio using VGGish embeddings and a mood prediction model.

    Parameters:
        vggish_pb (str): Path to the VGGish .pb file.
        mood_pb (str): Path to the mood prediction .pb file.
        audio_input (np.ndarray): Preprocessed audio input.

    Returns:
        dict: Predictions for mood categories.
    """
    # Load VGGish model
    vggish_graph = load_model(vggish_pb)
    with tf.Graph().as_default() as graph_vggish:
        tf.import_graph_def(vggish_graph, name="")
        with tf.compat.v1.Session(graph=graph_vggish) as sess:
            input_tensor = graph_vggish.get_tensor_by_name("vggish/input_features:0")
            output_tensor = graph_vggish.get_tensor_by_name("vggish/embeddings:0")

            # Run VGGish to extract embeddings
            #embeddings = sess.run(output_tensor, feed_dict={input_tensor: audio_input})

    # Load mood prediction model
    mood_graph = load_model(mood_pb)
    return results

## preprocess_audio

In [15]:
def preprocess_audio(file_path):
    # Load audio file
    audio, sr = librosa.load(file_path, sr=16000, mono=True)

    # Compute mel spectrogram
    mel_spectrogram = librosa.feature.melspectrogram(
        y=audio,
        sr=16000,
        n_fft=400,      # 25ms window
        hop_length=160, # 10ms step
        n_mels=64       # 64 mel bins
    )

    # Convert to log scale
    log_mel_spectrogram = np.log(mel_spectrogram + 1e-6)

    # Ensure fixed number of frames (e.g., 96 frames)
    if log_mel_spectrogram.shape[1] < 96:
        pad_width = 96 - log_mel_spectrogram.shape[1]
        log_mel_spectrogram = np.pad(log_mel_spectrogram, ((0, 0), (0, pad_width)), mode='constant')
    else:
        log_mel_spectrogram = log_mel_spectrogram[:, :96]

    # Add batch dimension
    return log_mel_spectrogram[np.newaxis, :, :].astype(np.float32)

## get_embeddings
Get VGGish embeddings

In [16]:
def get_embeddings(audio_input, graph_def):
    #start_time = time.time()
    with tf.Graph().as_default() as graph:
        tf.import_graph_def(graph_def, name="")
        config = tf.compat.v1.ConfigProto(log_device_placement=True)  # Enable logging
        with tf.compat.v1.Session(graph=graph, config=config) as sess:  # <-- Pass config here
            input_tensor = graph.get_tensor_by_name("model/Placeholder:0")
            output_tensor = graph.get_tensor_by_name("model/vggish/embeddings:0")
            embeddings = sess.run(output_tensor, feed_dict={input_tensor: audio_input})
    #print(f"Time to generate embeddings: {time.time() - start_time:.4f} seconds")

    return embeddings

## get_happy
Get happy feature for song using Essentia's pre-loaded model.

In [17]:
def get_happy(song_embeddings, mood_happy_sess, mood_happy_input, mood_happy_output):

    happy_predictions = mood_happy_sess.run(mood_happy_output, feed_dict={mood_happy_input: song_embeddings})
    # Extract probabilities
    happy_prob = happy_predictions[0][0]  # Probability of "happy"
    not_happy_prob = happy_predictions[0][1]  # Probability of "non-happy"
    print('***** happy predictions raw: ********')
    print(happy_predictions)
    return happy_prob, not_happy_prob

## get_deam
Function to extract valence and arousal using Essentia model.

In [18]:
def get_deam(song_embeddings, deam_sess, deam_input, deam_output):

    predictions = deam_sess.run(deam_output, feed_dict={deam_input: song_embeddings})
    # Extract arousal and valence (assuming first value is arousal, second is valence)
    arousal = predictions[0][0]
    valence = predictions[0][1]

    return arousal, valence

## get_aggressive
Function to extract aggressive score using Essentia model.

In [19]:
def get_aggressive(song_embeddings, aggressive_sess, aggressive_input, aggressive_output):

    predictions = aggressive_sess.run(aggressive_output, feed_dict={aggressive_input: song_embeddings})
    # Extract aggressive score
    aggressive = predictions[0][0]
    non_aggressive = predictions[0][1]

    return aggressive, non_aggressive

## get_sad
Function to extract sad score using Essentia model.

In [20]:
def get_sad(song_embeddings, sad_sess, sad_input, sad_output):

    predictions = sad_sess.run(sad_output, feed_dict={sad_input: song_embeddings})
    # Extract sad score
    sad = predictions[0][0]
    non_sad = predictions[0][1]

    return sad, non_sad

## get_relaxed
Function to extract relaxed score using Essentia model.

In [21]:
def get_relaxed(song_embeddings, relaxed_sess, relaxed_input, relaxed_output):

    predictions = relaxed_sess.run(relaxed_output, feed_dict={relaxed_input: song_embeddings})
    # Extract relaxed score
    relaxed = predictions[0][0]
    non_relaxed = predictions[0][1]

    return relaxed, non_relaxed

## get_party
Function to extract party score using Essentia model.

In [22]:
def get_party(song_embeddings, party_sess, party_input, party_output):

    predictions = party_sess.run(party_output, feed_dict={party_input: song_embeddings})
    # Extract party score
    party = predictions[0][0]
    non_party = predictions[0][1]

    return party, non_party

# Define  paths

In [23]:
output_folder = "/content/drive/MyDrive/Music and War/Outputs"

vggish_pb = '/content/drive/My Drive/Music and War/Checkpoints/audioset-vggish-3.pb'
mood_happy_pb = '/content/drive/My Drive/Music and War/Checkpoints/mood_happy-audioset-vggish-1.pb'
deam_pb = '/content/drive/My Drive/Music and War/Checkpoints/deam-audioset-vggish-2.pb'
aggressive_pb = '/content/drive/My Drive/Music and War/Checkpoints/mood_aggressive-audioset-vggish-1.pb'
sad_pb = '/content/drive/My Drive/Music and War/Checkpoints/mood_sad-audioset-vggish-1.pb'
party_pb = '/content/drive/My Drive/Music and War/Checkpoints/mood_party-audioset-vggish-1.pb'
relaxed_pb = '/content/drive/My Drive/Music and War/Checkpoints/mood_relaxed-audioset-vggish-1.pb'

# Load Models
Load Essentia and VGGish models used later for processing.

## Load VGGish

In [24]:
# Load VGGish model
with tf.io.gfile.GFile(vggish_pb, "rb") as f:
    vggish_graph_def = tf.compat.v1.GraphDef()
    vggish_graph_def.ParseFromString(f.read())

## Load Happy model

In [25]:
# Load Happy prediction model
with tf.io.gfile.GFile(mood_happy_pb, "rb") as f:
    mood_happy_graph_def = tf.compat.v1.GraphDef() # Empty TensorFlow graph definition
    mood_happy_graph_def.ParseFromString(f.read()) # Load the model into the graph.
    # Now mood_happy_graph_def stores the model, but it is not loaded into the runtime yet.

# Load the happy model into the runtime:
with tf.Graph().as_default() as mood_happy_graph:
    tf.import_graph_def(mood_happy_graph_def, name="")
    # Start a TensorFlow session with the imported graph
    mood_happy_sess = tf.compat.v1.Session(graph=mood_happy_graph)
    # Get input and output tensors
    mood_happy_input = mood_happy_graph.get_tensor_by_name("model/Placeholder:0")
    mood_happy_output = mood_happy_graph.get_tensor_by_name("model/Softmax:0")
    del mood_happy_graph_def  # Removes the raw model definition from memory

## Load DEAM model
Used for extraction of Valence/Arousal

In [26]:
# Load DEAM model
with tf.io.gfile.GFile(deam_pb, "rb") as f:
    deam_graph_def = tf.compat.v1.GraphDef()
    deam_graph_def.ParseFromString(f.read())  # Load the model into the graph definition

# Load the DEAM model into the runtime
with tf.Graph().as_default() as deam_graph:
    tf.import_graph_def(deam_graph_def, name="")

    # Start a TensorFlow session with the imported graph
    deam_sess = tf.compat.v1.Session(graph=deam_graph)

    # Get input and output tensors
    deam_input = deam_graph.get_tensor_by_name("model/Placeholder:0")  # Adjust if needed
    deam_output = deam_graph.get_tensor_by_name("model/Identity:0")  # Adjust if needed

    del deam_graph_def  # Free up memory

## Load Aggressive model
Used for extraction of aggressive feature

In [27]:
# Load aggressive  model
with tf.io.gfile.GFile(aggressive_pb, "rb") as f:
    aggressive_graph_def = tf.compat.v1.GraphDef()
    aggressive_graph_def.ParseFromString(f.read())  # Load the model into the graph definition

# Load the aggressive model into the runtime
with tf.Graph().as_default() as aggressive_graph:
    tf.import_graph_def(aggressive_graph_def, name="")

    # Start a TensorFlow session with the imported graph
    aggressive_sess = tf.compat.v1.Session(graph=aggressive_graph)

    # Get input and output tensors
    aggressive_input = aggressive_graph.get_tensor_by_name("model/Placeholder:0")  # Adjust if needed
    aggressive_output = aggressive_graph.get_tensor_by_name("model/Softmax:0")  # Adjust if needed

    del aggressive_graph_def  # Free up memory

## Load sad model
Used for extraction of sad feature

In [28]:
# Load sad model
with tf.io.gfile.GFile(sad_pb, "rb") as f:
    sad_graph_def = tf.compat.v1.GraphDef()
    sad_graph_def.ParseFromString(f.read())  # Load the model into the graph definition

# Load the sad model into the runtime
with tf.Graph().as_default() as sad_graph:
    tf.import_graph_def(sad_graph_def, name="")

    # Start a TensorFlow session with the imported graph
    sad_sess = tf.compat.v1.Session(graph=sad_graph)

    # Get input and output tensors
    sad_input = sad_graph.get_tensor_by_name("model/Placeholder:0")  # Adjust if needed
    sad_output = sad_graph.get_tensor_by_name("model/Softmax:0")  # Adjust if needed

    del sad_graph_def  # Free up memory

## Load relaxed model
Used for extraction of relaxed feature

In [29]:
# Load relaxed model
with tf.io.gfile.GFile(relaxed_pb, "rb") as f:
    relaxed_graph_def = tf.compat.v1.GraphDef()
    relaxed_graph_def.ParseFromString(f.read())  # Load the model into the graph definition

# Load the relaxed model into the runtime
with tf.Graph().as_default() as relaxed_graph:
    tf.import_graph_def(relaxed_graph_def, name="")

    # Start a TensorFlow session with the imported graph
    relaxed_sess = tf.compat.v1.Session(graph=relaxed_graph)

    # Get input and output tensors
    relaxed_input = relaxed_graph.get_tensor_by_name("model/Placeholder:0")  # Adjust if needed
    relaxed_output = relaxed_graph.get_tensor_by_name("model/Softmax:0")  # Adjust if needed

    del relaxed_graph_def  # Free up memory

## Load party model
Used for extraction of party feature

In [30]:
# Load party model
with tf.io.gfile.GFile(party_pb, "rb") as f:
    party_graph_def = tf.compat.v1.GraphDef()
    party_graph_def.ParseFromString(f.read())  # Load the model into the graph definition

# Load the party model into the runtime
with tf.Graph().as_default() as party_graph:
    tf.import_graph_def(party_graph_def, name="")

    # Start a TensorFlow session with the imported graph
    party_sess = tf.compat.v1.Session(graph=party_graph)

    # Get input and output tensors
    party_input = party_graph.get_tensor_by_name("model/Placeholder:0")  # Adjust if needed
    party_output = party_graph.get_tensor_by_name("model/Softmax:0")  # Adjust if needed

    del party_graph_def  # Free up memory

# Main
Run over all songs in all CVS's. Analyze each song using the loaded models and Essentia features. Record the results in a table.

In [46]:
folder_path = '/content/drive/My Drive/Music and War/Datasets/'
file_list, N_files = list_csv_files_in_folder(folder_path)

for file_ind in range(N_files): # Iterate over all CSV files
    current_file_name = file_list.iat[file_ind,0] # Current CSV file name
    current_chart, N_songs = read_csv_file(current_file_name, folder_path) # Read the current CSV
    current_chart = add_columns(current_chart, feature_list) # Add feature columns
    for song_ind in range(N_songs): # Iterate over all songs in the current CSV
        current_song = get_song_details(current_chart, song_ind) # Song name and artist name, in a dictionary
        song_id = f"{current_song['song_name']} - {current_song['artist_name']}" # Song identifier for chache
        if song_id in cached_songs.index:  # Check if song is in cache
            sp_results = get_song_features_from_cache(song_id, cached_songs, feature_list)
        else:
            audio_file_name, youtube_url, youtube_title = search_and_download(song_ind+1, current_song)
            if audio_file_name != 'no_file': # File was found on YouTube
                sp_results = analyze_song(audio_file_name) # Get signal processing results, danceability...
                vggsih_audio_input = preprocess_audio(audio_file_name) # Convert audio to MEL spectrogram for VGGish
                song_embeddings = get_embeddings(vggsih_audio_input, vggish_graph_def) # Get song VGGish embeddings
                #start_time = time.time()
                sp_results['happy'], sp_results['non_happy'] = get_happy(song_embeddings, mood_happy_sess, mood_happy_input, mood_happy_output)
                sp_results['arousal'], sp_results['valence'] = get_deam(song_embeddings, deam_sess, deam_input, deam_output)
                sp_results['aggressive'], sp_results['non_aggressive'] = get_aggressive(song_embeddings, aggressive_sess, aggressive_input, aggressive_output)
                sp_results['sad'], sp_results['non_sad'] = get_sad(song_embeddings, sad_sess, sad_input, sad_output)
                sp_results['relaxed'], sp_results['non_relaxed'] = get_relaxed(song_embeddings, relaxed_sess, relaxed_input, relaxed_output)
                sp_results['party'], sp_results['non_party'] = get_party(song_embeddings, party_sess, party_input, party_output)
                #print(f"Time to get song features: {time.time() - start_time:.4f} seconds")
                cached_songs.loc[song_id] = [current_song['song_name'], current_song['artist_name']] + [sp_results.get(feature, '-9999') for feature in feature_list]
            else: # File not found on YouTube
                sp_results = update_results_missing(sp_results) # Set all features to -9999
                print("not found song, updated features with -9999")
                print(sp_results)
                cached_songs.loc[song_id] = [current_song['song_name'], current_song['artist_name']] + ['-9999'] * len(feature_list)

        cached_songs.to_csv(cache_file)  # Save song features to cache
        current_chart = update_chart(current_chart, song_ind, sp_results) # Update the extracted features in the current chart
    # Export feature output for this csv file containing 200 songs
    file_name = current_file_name.replace(".csv", "_features.csv")
    file_path = os.path.join(output_folder, file_name)
    current_chart.to_csv(file_path, index=False)  # Save to CSV


[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Retrieving cached features for I Wanna Be Yours - Arctic Monkeys
Retrieving cached features for Calm Down (with Selena Gomez) - Rema, Selena Gomez
Number of songs found: 50
Retrieving cached features for קרן שמש - Benaia Barabi
Retrieving cached features for סהרה - Live - Tuna, Jasmin Moallem
Retrieving cached features for אני - Omer Adam
Retrieving cached features for האמת - Odeya, רואי אדם
Retrieving cached features for עושה לי צרות - Eden Hason
Retrieving cached features for I'm Good (Blue) - David Guetta, Bebe Rexha
Retrieving cached features for משוגעת - Gal Adam, NOROZ
Retrieving cached features for מאמי - Odeya, Noa Kirel
Retrieving cached features for Unholy (feat. Kim Petras) - Sam Smith, Kim Petras
Retrieving cached features for היי בייב - Tuna
Retrieving cached features for החיים שלי אחריך - Odeya
Retrieving cached features for Kill Bill - SZA
Retrieving cached features for פנתרה - Noa Kirel
Retrieving cached f

# Update cache
Scan cache. For all songs without features, but with a manually inserted YouTube URL - get feautres and update cache

In [None]:
for song_id in cached_songs.index:  # Iterate over all cached songs
    if pd.notna(cached_songs.loc[song_id, 'youtube_url_manual']):  # Only process if youtube_url_manual is not empty
        youtube_url = cached_songs.loc[song_id, 'youtube_url_manual']
        artist_name, song_name = cached_songs.loc[song_id, ['artist_name', 'song_name']]
        audio_file_name, this_youtube_url, this_video_title = download_audio(song_id, youtube_url, artist_name, song_name)

        if audio_file_name != 'no_file':  # File was found on YouTube
            print('audio_file_name:')
            print(audio_file_name)
            sp_results = analyze_song(audio_file_name)  # Get signal processing results, danceability...
            vggsih_audio_input = preprocess_audio(audio_file_name)  # Convert audio to MEL spectrogram for VGGish
            song_embeddings = get_embeddings(vggsih_audio_input, vggish_graph_def)  # Get song VGGish embeddings

            sp_results['happy'], sp_results['non_happy'] = get_happy(song_embeddings, mood_happy_sess, mood_happy_input, mood_happy_output)
            sp_results['arousal'], sp_results['valence'] = get_deam(song_embeddings, deam_sess, deam_input, deam_output)
            sp_results['aggressive'], sp_results['non_aggressive'] = get_aggressive(song_embeddings, aggressive_sess, aggressive_input, aggressive_output)
            sp_results['sad'], sp_results['non_sad'] = get_sad(song_embeddings, sad_sess, sad_input, sad_output)
            sp_results['relaxed'], sp_results['non_relaxed'] = get_relaxed(song_embeddings, relaxed_sess, relaxed_input, relaxed_output)
            sp_results['party'], sp_results['non_party'] = get_party(song_embeddings, party_sess, party_input, party_output)

            # Update cached_songs with extracted features
            cached_songs.loc[song_id, feature_list] = [sp_results.get(feature, '-9999') for feature in feature_list]
        else:  # File not found on YouTube
            sp_results = update_results_missing(sp_results)  # Set all features to -9999
            print("Not found song, updated features with -9999")
            print(sp_results)
            cached_songs.loc[song_id, feature_list] = ['-9999'] * len(feature_list)
    cached_songs.to_csv(cache_file)  # Save updated song features to cache

# Debug