# Fast Song Matcher Using Pre-processed Embeddings

This notebook provides a system that matches song lyrics to simple text tags (like "beach", "happy", "sad", etc.) by:
1. Pre-processing all song data and creating embeddings once
2. Saving processed data to disk for quick lookup
3. Using a hybrid semantic and keyword-based approach for better matching

## Setup
First, let's install the required packages if they're not already installed.

In [1]:
# Install required packages
!pip install sentence-transformers tqdm



In [5]:
# Import libraries
from sentence_transformers import SentenceTransformer, util
import pandas as pd
import torch
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
import re
import os
import pickle
from tqdm.notebook import tqdm
from ipywidgets import IntProgress as IProgress

# Set device to MPS if available, otherwise use CPU
device = torch.device("mps" if torch.backends.mps.is_available() else 
                     ("cuda" if torch.cuda.is_available() else "cpu"))
print(f"Using device: {device}")

Using device: mps


## SongMatcher Class
This class handles data processing, embeddings creation, and lookup functionality.

In [3]:
class SongMatcher:
    def __init__(self, csv_path='songs_lyrics_adjusted.csv', model_name="all-mpnet-base-v2"):
        self.model_name = model_name
        self.csv_path = csv_path
        self.embeddings_path = f"song_embeddings_{os.path.basename(model_name)}.pkl"
        self.tfidf_path = "song_tfidf_model.pkl"
        self.tfidf_matrix_path = "song_tfidf_matrix.npz"
        self.processed_df_path = "processed_songs_df.pkl"
        
        # Tag expansion dictionary
        self.tag_expansions = {
            "beach": "beach ocean sea sand waves summer coastal shore surf sunshine relax vacation tropical",
            "sad": "sad crying tears sorrow depression melancholy heartbreak grief lonely somber dark emotional pain",
            "happy": "happy joy smile laugh excitement celebration upbeat cheerful euphoria positivity bright",
            "love": "love romance relationship heart passion affection emotion together forever commitment intimacy",
            "party": "party celebration dance club night drinks social friends fun excitement energy",
            "angry": "angry rage fury mad furious hate vengeful aggressive irritated bitter anger upset",
            "chill": "chill relax calm peaceful mellow smooth cool laid-back easy tranquil ambient soothing",
            "workout": "workout exercise fitness gym training cardio running pump motivation strength energy",
            "sleep": "sleep night dream rest peaceful calm quiet lullaby meditation gentle soft soothing",
            "driving": "driving road car journey travel highway cruise ride drive trip adventure moving"
            # Add more as needed
        }
        
        # Load or process data
        self.load_or_process_data()
    
    def preprocess_text(self, text):
        """Clean text by removing special chars and normalizing"""
        if not isinstance(text, str):
            return ""
        # Convert to lowercase
        text = text.lower()
        # Remove special characters and extra whitespace
        text = re.sub(r'[^\w\s]', ' ', text)
        text = re.sub(r'\s+', ' ', text).strip()
        return text
    
    def expand_tag(self, tag):
        """Expand tag with related concepts if available"""
        if tag.lower() in self.tag_expansions:
            return self.tag_expansions[tag.lower()]
        return tag
    
    def load_or_process_data(self):
        """Load pre-processed data or create if doesn't exist"""
        # Check if processed data exists
        if (os.path.exists(self.processed_df_path) and 
            os.path.exists(self.embeddings_path) and 
            os.path.exists(self.tfidf_path) and
            os.path.exists(self.tfidf_matrix_path)):
            
            print("Loading pre-processed data...")
            self.df = pd.read_pickle(self.processed_df_path)
            with open(self.embeddings_path, 'rb') as f:
                self.embeddings = pickle.load(f)
            with open(self.tfidf_path, 'rb') as f:
                self.tfidf_vectorizer = pickle.load(f)
            self.tfidf_matrix = np.load(self.tfidf_matrix_path, allow_pickle=True)['matrix']
            
            # Load model for new tag encoding
            print("Loading model for query encoding...")
            self.model = SentenceTransformer(self.model_name)
            self.model = self.model.to(device)
            
            print(f"Data loaded successfully: {len(self.df)} songs")
        else:
            print("Pre-processed data not found. Processing data...")
            self.process_data()
    
    def process_data(self):
        """Process data from scratch and save results"""
        # Load and preprocess the dataset
        print("Loading dataset...")
        self.df = pd.read_csv(self.csv_path)
        self.df["lyrics"] = self.df["lyrics"].fillna("")
        
        # Preprocess text
        print("Preprocessing text...")
        self.df["processed_lyrics"] = self.df["lyrics"].apply(self.preprocess_text)
        self.df["processed_title"] = self.df["title"].apply(self.preprocess_text)
        
        # Create combined field for embedding
        self.df["title_lyrics"] = self.df["processed_title"] + " " + self.df["processed_lyrics"]
        
        # Create TF-IDF vectorizer for keyword matching
        print("Building TF-IDF model...")
        self.tfidf_vectorizer = TfidfVectorizer(max_features=5000, stop_words='english')
        self.tfidf_matrix = self.tfidf_vectorizer.fit_transform(self.df["processed_lyrics"])
        
        # Load the sentence transformer model
        print(f"Loading {self.model_name} model...")
        self.model = SentenceTransformer(self.model_name)
        self.model = self.model.to(device)
        
        # Generate embeddings for all songs
        print("Generating embeddings for all songs (this may take a while)...")
        self.embeddings = {}
        
        # Store title and lyrics embeddings separately
        for idx, row in tqdm(self.df.iterrows(), total=len(self.df)):
            # Embed title (move to CPU for storage)
            title_emb = self.model.encode(
                row["processed_title"],
                convert_to_tensor=True
            ).cpu().numpy()
            
            # Embed lyrics (limit length for efficiency)
            lyrics = row["processed_lyrics"][:512] if len(row["processed_lyrics"]) > 0 else "empty"
            lyrics_emb = self.model.encode(
                lyrics,
                convert_to_tensor=True
            ).cpu().numpy()
            
            self.embeddings[idx] = {
                "title_embedding": title_emb,
                "lyrics_embedding": lyrics_emb
            }
        
        # Save processed data
        print("Saving processed data...")
        self.df.to_pickle(self.processed_df_path)
        with open(self.embeddings_path, 'wb') as f:
            pickle.dump(self.embeddings, f)
        with open(self.tfidf_path, 'wb') as f:
            pickle.dump(self.tfidf_vectorizer, f)
        np.savez_compressed(self.tfidf_matrix_path, matrix=self.tfidf_matrix)
        
        print("Data processing complete!")
    
    def find_matching_songs(self, tag, num_results=5, title_weight=0.3, semantic_weight=0.7, keyword_weight=0.3):
        """Quickly find songs matching a tag using pre-computed embeddings"""
        expanded_tag = self.expand_tag(tag)
        print(f"Finding songs matching tag: '{tag}'")
        if tag != expanded_tag:
            print(f"Expanded to: '{expanded_tag}'")
        
        # Encode the tag
        tag_embedding = self.model.encode(expanded_tag, convert_to_tensor=True).to(device)
        
        # Calculate TF-IDF for tag
        tag_tfidf = self.tfidf_vectorizer.transform([expanded_tag])
        
        # Get similarity scores for all songs
        results = []
        
        for idx in range(len(self.df)):
            # Get pre-computed embeddings
            title_emb = torch.tensor(self.embeddings[idx]["title_embedding"]).to(device)
            lyrics_emb = torch.tensor(self.embeddings[idx]["lyrics_embedding"]).to(device)
            
            # Calculate similarities
            title_sim = util.pytorch_cos_sim(tag_embedding, title_emb).item()
            lyrics_sim = util.pytorch_cos_sim(tag_embedding, lyrics_emb).item()
            
            # Calculate keyword matching score
            keyword_sim = (tag_tfidf @ self.tfidf_matrix[idx].T).toarray()[0][0] if self.tfidf_matrix[idx].nnz > 0 else 0
            
            # Combine scores
            semantic_sim = (title_weight * title_sim + (1-title_weight) * lyrics_sim)
            combined_score = semantic_weight * semantic_sim + keyword_weight * keyword_sim
            
            results.append({
                "index": idx,
                "title": self.df.iloc[idx]["title"],
                "artist": self.df.iloc[idx]["artist"],
                "combined_score": combined_score,
                "title_sim": title_sim,
                "lyrics_sim": lyrics_sim,
                "keyword_sim": keyword_sim
            })
        
        # Convert to DataFrame and sort
        results_df = pd.DataFrame(results)
        results_df = results_df.sort_values("combined_score", ascending=False)
        
        return results_df.head(num_results)

## Initialize the Matcher
The first run will pre-process all data (takes a few minutes). Subsequent runs will load from disk (takes seconds).

In [None]:
# Initialize the matcher - first run will process data, subsequent runs will load from cached files
matcher = SongMatcher('songs_lyrics_adjusted.csv')

Pre-processed data not found. Processing data...
Loading dataset...


## Find Matching Songs
Now we can quickly find songs matching any tag!

In [None]:
# Example: Find songs related to "beach"
tag = "beach"
results = matcher.find_matching_songs(tag, num_results=5)

# Display results
print(f"\nTop 5 songs matching '{tag}':")
for _, row in results.iterrows():
    print(f"{row['title']} by {row['artist']} (Score: {row['combined_score']:.4f})")
    print(f"  Title sim: {row['title_sim']:.3f}, Lyrics sim: {row['lyrics_sim']:.3f}, Keyword: {row['keyword_sim']:.3f}")

## Try Another Tag

In [None]:
# Example: Find songs related to "sad"
tag = "sad"
results = matcher.find_matching_songs(tag, num_results=5)

# Display results
print(f"\nTop 5 songs matching '{tag}':")
for _, row in results.iterrows():
    print(f"{row['title']} by {row['artist']} (Score: {row['combined_score']:.4f})")
    print(f"  Title sim: {row['title_sim']:.3f}, Lyrics sim: {row['lyrics_sim']:.3f}, Keyword: {row['keyword_sim']:.3f}")

## Interactive Search Function
Run this cell to interactively search for songs matching different tags.

In [None]:
from IPython.display import display, clear_output

def interactive_search():
    while True:
        # Get user input
        tag = input("Enter a tag to find matching songs (or 'quit' to exit): ")
        
        if tag.lower() == 'quit':
            break
            
        # Find matches
        results = matcher.find_matching_songs(tag, num_results=5)
        
        # Display results
        clear_output(wait=True)
        print(f"\nTop 5 songs matching '{tag}':")
        for _, row in results.iterrows():
            print(f"{row['title']} by {row['artist']} (Score: {row['combined_score']:.4f})")
            print(f"  Title sim: {row['title_sim']:.3f}, Lyrics sim: {row['lyrics_sim']:.3f}, Keyword: {row['keyword_sim']:.3f}")
        print("\n")

# Run the interactive search
interactive_search()

## Add New Tag Expansions
You can add your own tag expansions to improve matching for specific moods or themes.

In [None]:
# Add a new tag expansion
matcher.tag_expansions["nostalgic"] = "nostalgic memory remember past childhood reminisce throwback history memories old days youth"

# Test the new tag
results = matcher.find_matching_songs("nostalgic", num_results=5)

# Display results
print(f"\nTop 5 songs matching 'nostalgic':")
for _, row in results.iterrows():
    print(f"{row['title']} by {row['artist']} (Score: {row['combined_score']:.4f})")

## Adjust Matching Parameters
You can tune the weights to prioritize title matches, lyrics matches, or keyword matches.

In [None]:
# Adjust parameters to prioritize keyword matches more
tag = "beach"
results = matcher.find_matching_songs(
    tag,
    num_results=5,
    title_weight=0.2,      # Lower weight for title matching
    semantic_weight=0.5,   # Lower weight for semantic matching
    keyword_weight=0.5     # Higher weight for keyword matching
)

# Display results
print(f"\nTop 5 songs matching '{tag}' (prioritizing keyword matches):")
for _, row in results.iterrows():
    print(f"{row['title']} by {row['artist']} (Score: {row['combined_score']:.4f})")
    print(f"  Title sim: {row['title_sim']:.3f}, Lyrics sim: {row['lyrics_sim']:.3f}, Keyword: {row['keyword_sim']:.3f}")