In [83]:

import pylast
import os, multiprocessing as mp
import sys
from dotenv import load_dotenv
import pandas as pd
import numpy
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.naive_bayes import ComplementNB
from sklearn.metrics import mean_squared_error, r2_score

sys.path.append(os.path.abspath('..'))
from music_sentiment.lastfm_api import get_lastfm_network, get_song_tags
from music_sentiment.tag_utils import process_tag_weights

#load the environment variables from the .env file
load_dotenv()


True

In [25]:
# load the cleaned dataset

df = pd.read_csv("../data/cleaned/muse_cleaned.csv")


In [None]:
import concurrent.futures
import json
import time
from pathlib import Path
from tqdm import tqdm

# Create a cache directory if it doesn't exist
cache_dir = Path("../data/cache")
cache_dir.mkdir(parents=True, exist_ok=True)
cache_file = cache_dir / "lastfm_song_tags_cache.json"

# Initialize or load cache
def load_cache():
    if cache_file.exists():
        with open(cache_file, 'r') as f:
            return json.load(f)
    return {}

def save_cache(cache):
    with open(cache_file, 'w') as f:
        json.dump(cache, f)

# Modified function to use cache
def get_song_tags_cached(network, artist, track, limit=20, cache=None):
    """Get song tags with caching"""
    # Create a unique key for this artist-track pair
    cache_key = f"{artist.lower().strip()}|{track.lower().strip()}"
    
    # Check if we have this in our cache
    if cache and cache_key in cache:
        return cache[cache_key]
    
    # Not in cache, fetch from API
    try:
        tags = get_song_tags(network, artist, track, limit)
        
        # Store in cache
        if cache is not None:
            cache[cache_key] = tags
            # Periodically save cache to disk (e.g., every 10 new entries)
            if len(cache) % 10 == 0:
                save_cache(cache)
                
        return tags
    except Exception as e:
        print(f"Error fetching tags for {artist} - {track}: {e}")
        return []

# Worker function for parallel processing
def process_song(args):
    idx, row, network, cache = args
    artist, track = row['artist'], row['track']
    
    # Add rate limiting to avoid hitting API limits
    # Sleep a small random amount to distribute requests
    time.sleep(0.2)
    
    # Get tags using cache when available
    song_tags = get_song_tags_cached(network, artist, track, cache=cache)
    
    # Process tags into weighted text string using your updated function
    tag_text = process_tag_weights(song_tags)
    
    return idx, song_tags, tag_text

# Main processing function with parallel execution
def fetch_tags_parallel(df_sample, max_workers=4):
    # Load cache
    cache = load_cache()
    
    network = get_lastfm_network()
    total_songs = len(df_sample)
    
    print(f"Fetching tags for {total_songs} songs using {max_workers} workers...")
    
    # Prepare data structure to store results
    results = {}
    
    # Create a list of tasks
    tasks = [(idx, row, network, cache) for idx, row in df_sample.iterrows()]
    
    # Use ThreadPoolExecutor for I/O bound tasks like API calls
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Submit all tasks and process results as they complete
        futures = {executor.submit(process_song, task): task[0] for task in tasks}
        
        # Use tqdm for a progress bar
        for future in tqdm(concurrent.futures.as_completed(futures), total=len(futures)):
            idx, song_tags, tag_text = future.result()
            results[idx] = (song_tags, tag_text)
    
    # Save the final cache
    save_cache(cache)
    
    # Extract results in the same order as the input DataFrame
    tags_with_weights = []
    tags_text_list = []
    
    for idx in df_sample.index:
        if idx in results:
            tags_with_weights.append(results[idx][0])
            tags_text_list.append(results[idx][1])
        else:
            # Handle missing results (e.g., if there was an error)
            tags_with_weights.append([])
            tags_text_list.append("")  # Empty string instead of empty dict
    
    return tags_with_weights, tags_text_list

# Main execution code
sample_size = 2000
df_sample = df.sample(sample_size, random_state=42)

# Fetch tags in parallel
tags_with_weights, tags_text_list = fetch_tags_parallel(df_sample, max_workers=4)

# Add the tags to the DataFrame
df_sample['tags_with_weights'] = tags_with_weights
df_sample['tag_text'] = tags_text_list

# Ensure no NaN values in tag_text by replacing any potential empty values with empty strings
df_sample['tag_text'] = df_sample['tag_text'].fillna("")

# Save the DataFrame with tags to a CSV file
df_sample.to_csv("../data/cleaned/muse_with_tags_valence.csv", index=False)

print(f"Processing complete. Data saved to ../data/cleaned/muse_with_tags_valence.csv")

In [79]:
from sklearn.model_selection import train_test_split, cross_val_score

# Load the dataset with tags
df_sample = pd.read_csv("../data/cleaned/muse_with_tags_valence.csv")

# ComplementNB works with sparse matrices directly

# Your existing code for feature extraction
vectorizer = TfidfVectorizer(min_df=1, max_df=0.7)
df_sample_filtered = df_sample.dropna(subset=['tag_text'])
X = vectorizer.fit_transform(df_sample_filtered['tag_text'])

# Bin the valence scores into discrete categories
bins = [0, 2.5, 5, 7.5, 10]  # You can adjust these bins
labels = [0, 1, 2, 3]    # Numeric labels for the bins
df_sample['valence_binned'] = pd.cut(df_sample['valence_tags'], bins=bins, labels=labels)

# Use the binned values as target
y = df_sample['valence_binned']

# Split data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

# Use ComplementNB which works well with sparse features
model = ComplementNB()
model.fit(X_train, y_train)  # No need for .toarray()

# Make predictions
y_pred = model.predict(X_test)

# Evaluate
from sklearn.metrics import accuracy_score, classification_report
print(f"Accuracy: {accuracy_score(y_test, y_pred):.4f}")
print(classification_report(y_test, y_pred))



Accuracy: 0.6167
              precision    recall  f1-score   support

           0       0.00      0.00      0.00        12
           1       0.83      0.34      0.48       207
           2       0.64      0.87      0.74       337
           3       0.50      0.11      0.19        44

    accuracy                           0.62       600
   macro avg       0.49      0.33      0.35       600
weighted avg       0.68      0.62      0.59       600



In [80]:
from joblib import dump
import os

# evaluate model on the training data
y_train_pred = model.predict(X_train)

# print evaluation metrics with the same format as the test set
from sklearn.metrics import accuracy_score, classification_report
print(f"Training Data Accuracy: {accuracy_score(y_train, y_train_pred):.4f}")
print(classification_report(y_train, y_train_pred))



Training Data Accuracy: 0.7357
              precision    recall  f1-score   support

           0       0.14      0.23      0.17        44
           1       0.95      0.54      0.69       485
           2       0.71      0.94      0.81       777
           3       1.00      0.30      0.46        94

    accuracy                           0.74      1400
   macro avg       0.70      0.50      0.53      1400
weighted avg       0.80      0.74      0.72      1400



In [None]:

# Create models directory if it doesn't exist
os.makedirs('../models', exist_ok=True)

# Save the model
dump(model, '../models/valence_classifier.joblib')
# Save the vectorizer
dump(vectorizer, '../models/tfidf_vectorizer.joblib')

print("Model and vectorizer saved successfully!")

## Trying a new approach, using the warriner list to map Last.fm tags to words tied with emotion

In [81]:
# In your exploration.ipynb or a new notebook

# Load the cleaned Warriner dataset
warriner_df = pd.read_csv('../data/cleaned/warriner_clean.csv')

# Create a dictionary for faster lookups
warriner_dict = dict(zip(warriner_df['word'], zip(
    warriner_df['valence_score'], 
    warriner_df['arousal_score'], 
    warriner_df['dominance_score']
)))

# Function to extract Warriner features from tags
def extract_warriner_features(tag_text, warriner_df):
    # Split the tag text into individual words
    words = set(tag_text.lower().split())
    
    # Initialize feature dictionary
    features = {
        'warriner_valence_mean': 0.0,
        'warriner_arousal_mean': 0.0,
        'warriner_dominance_mean': 0.0,
        'warriner_coverage': 0.0
    }
    
    # Match words with the Warriner lexicon
    matched_words = []
    for word in words:
        if word in warriner_df.index:
            matched_words.append(word)
    
    # Calculate coverage
    if len(words) > 0:
        features['warriner_coverage'] = len(matched_words) / len(words)
    
    # Calculate emotion scores
    if matched_words:
        valence_scores = [warriner_df.loc[word, 'V.Mean.Sum'] for word in matched_words]
        arousal_scores = [warriner_df.loc[word, 'A.Mean.Sum'] for word in matched_words]
        dominance_scores = [warriner_df.loc[word, 'D.Mean.Sum'] for word in matched_words]
        
        features['warriner_valence_mean'] = sum(valence_scores) / len(valence_scores)
        features['warriner_arousal_mean'] = sum(arousal_scores) / len(arousal_scores)
        features['warriner_dominance_mean'] = sum(dominance_scores) / len(dominance_scores)
    
    return features

In [82]:
from sklearn.preprocessing import StandardScaler
from scipy.sparse import hstack
# Load Warriner wordlist
warriner_df = pd.read_csv('../data/cleaned/warriner_clean.csv', index_col='word')

# Apply Warriner features to your dataset
for idx, row in df_sample.iterrows():
    if pd.notna(row['tag_text']):
        warriner_features = extract_warriner_features(row['tag_text'], warriner_df)
        for feature, value in warriner_features.items():
            df_sample.at[idx, feature] = value

# Get your numeric features and scale them
warriner_cols = ['warriner_valence_mean', 'warriner_arousal_mean', 'warriner_dominance_mean', 'warriner_coverage']
X_warriner = df_sample[warriner_cols].fillna(0).values
scaler = StandardScaler()
X_warriner_scaled = scaler.fit_transform(X_warriner)

# Get your text features
X_tfidf = vectorizer.fit_transform(df_sample['tag_text'].fillna(''))

# Combine features
X_combined = hstack([X_tfidf, X_warriner_scaled])

# Split for training/testing
X_train, X_test, y_train, y_test = train_test_split(X_combined, df_sample['emotion'], test_size=0.3, random_state=42)

# Train your model
model = ComplementNB()
model.fit(X_train, y_train)

# Evaluate
y_pred = model.predict(X_test)
print(f"Accuracy: {accuracy_score(y_test, y_pred):.4f}")
print(classification_report(y_test, y_pred))

Accuracy: 0.7333
              precision    recall  f1-score   support

       happy       0.72      0.96      0.82       381
         sad       0.82      0.34      0.48       219

    accuracy                           0.73       600
   macro avg       0.77      0.65      0.65       600
weighted avg       0.76      0.73      0.70       600

