In [None]:
import pandas as pd
import polars as pl
from pathlib import Path

## 1. baseline

In [None]:
mxm_dataset_path = 'data/mxm_dataset_train.txt'
merged_songs_path = 'data/songs.csv'


if not Path('data/songs.csv').exists():
    print('Merging songs...')
    triplet_columns = ['user_id', 'song_id', 'play_count']
    track_columns = ['track_id', 'song_id', 'artist', 'title']
    
    triplet_df = pl.read_csv('data/train_triplets.txt', separator='\t', new_columns=triplet_columns, use_pyarrow=True)
    unique_tracks_df = pl.read_csv('data/p02_unique_tracks.csv', new_columns=track_columns)
    triplet_df = triplet_df.group_by('song_id').agg(pl.sum('play_count').alias('play_count')).sort('play_count', descending=True)
    mergerd_songs = triplet_df.join(unique_tracks_df, on='song_id', how='left').select('track_id', 'artist', 'title', 'play_count')
    mergerd_songs.write_csv('data/songs.csv')
else:
    print('Reading songs...')
    mergerd_songs = pl.read_csv(merged_songs_path, use_pyarrow=True)

In [None]:
mergerd_songs

In [None]:
class MXMDataLoader:
    def __init__(self, dataset_path_mxm, mergerd_songs):
        self.dataset_path = dataset_path_mxm
        self.songs = mergerd_songs
        
    def load(self):
        top_words = []
        filtered_tracks = []

        with open(self.dataset_path, 'r', encoding='utf-8') as file:
            for line in file:
                if line.startswith('#') or line.strip() == '':
                    continue
                elif line.startswith('%'):
                    top_words = line[1:].strip().split(',')
                else:
                    elements = line.strip().split(',')
                    track_id = elements[0]
                    word_counts = {int(count.split(':')[0]) - 1: int(count.split(':')[1]) for count in elements[2:]}
                    filtered_tracks.append((track_id, word_counts))
        self.top_words = top_words
        self.filtered_tracks = filtered_tracks
        
    def get_song_lyrics(self, track_id):
        def get_words(top_words, word_counts):
            return {top_words[index]:count for index, count in word_counts.items()}
            
        
        for track in self.filtered_tracks:
            if track[0] == track_id:
                return get_words(self.top_words, track[1])
        raise ValueError(f"Track ID '{track_id}' not found in the dataset.")
    
    def get_sorted_tracks_by_keyword(self, keyword, threshold):
        
        try:
            keyword_index = self.top_words.index(keyword)
        except ValueError:
            print(f"Keyword '{keyword}' not found in the dataset.")
            return
            
        filtered_tracks = []
        for idx, (track_id, word_counts) in enumerate(self.filtered_tracks): 
            keyword_count = word_counts.get(keyword_index, 0)
            if keyword_count >= threshold:
                row_df = self.songs.filter(pl.col('track_id') == track_id)
                if len(row_df) > 0:
                    _ , artist, title, play_count = row_df[0].row(0)
                    filtered_tracks.append((idx, track_id, artist, title, play_count, keyword_count))
        print("Done ✅ filtering tracks by keyword.")    
        filtered_tracks_df = pl.DataFrame(filtered_tracks, schema=['index_number', 'track_id' ,'artist', 'title', 'play_count', 'keyword_count']).sort('play_count', descending=True).head(50)
        return filtered_tracks_df

In [None]:
mxm_loader = MXMDataLoader(mxm_dataset_path, mergerd_songs)
mxm_loader.load()

In [None]:
# Try ‘love’ as the keyword and look through the lyrics of 3 random tracks given in the list of recommendations – do they have ‘love’ in the lyrics?
tracks = mxm_loader.get_sorted_tracks_by_keyword('life', 6)
tracks

In [None]:
print(tracks)

In [None]:
mxm_loader.get_song_lyrics('TRJRECT12903CBADA3')['love']

In [None]:
import random
idx = random.randint(0, len(tracks) - 1)
track_id = 'TROHFJK12903CC4BCE'
print(f"Track ID: {track_id}")
print(f"Lyrics: {mxm_loader.get_song_lyrics(track_id)}")


In [None]:
# If an incorrect keyword is given, the exception is handled
incorrect_keyword_tracks = mxm_loader.get_sorted_tracks_by_keyword('incorrect_keyword', 6)
incorrect_keyword_tracks

## 2. word2vec

In [None]:
import gensim.downloader as api
model_name = 'glove-wiki-gigaword-300'
wv = api.load('word2vec-google-news-300')

In [None]:
try:
    vec_love = wv['love']
except KeyError:
    print("The word 'love' does not appear in this model")
vec_love

In [None]:
class MXMDataLoaderW2V:
    def __init__(self, dataset_path_mxm, mergerd_songs, word_vectors):
        self.dataset_path = dataset_path_mxm
        self.songs = mergerd_songs
        self.word_vectors = word_vectors
        self.top_words = []
        self.filtered_tracks = []

    def load(self):
        with open(self.dataset_path, 'r', encoding='utf-8') as file:
            for line in file:
                if line.startswith('#') or line.strip() == '':
                    continue
                elif line.startswith('%'):
                    self.top_words = line[1:].strip().split(',')
                else:
                    elements = line.strip().split(',')
                    track_id = elements[0]
                    word_counts = {int(count.split(':')[0]) - 1 : int(count.split(':')[1]) for count in elements[2:]}
                    self.filtered_tracks.append((track_id, word_counts))

    def get_similar_keywords(self, keyword, top_n=5):
        """Get top_n similar words to the given keyword."""
        try:
            similar_words = self.word_vectors.most_similar(positive=[keyword], topn=top_n)
            return [keyword] + [word for word, _ in similar_words]  # Include the keyword itself
        except KeyError:
            print(f"Keyword '{keyword}' not found in the word2vec model.")
            return [keyword]

    def get_sorted_tracks_by_keyword(self, keyword, threshold, max_tracks=50):
        similar_keywords = self.get_similar_keywords(keyword)
        print(f"Similar keywords to '{keyword}': {similar_keywords}")
        similar_keyword_indices = [self.top_words.index(word) for word in similar_keywords if word in self.top_words]

        filtered_tracks = []
        for track_id, word_counts in self.filtered_tracks:
            total_count = sum(word_counts.get(idx, 0) for idx in similar_keyword_indices[:5])
            if total_count >= threshold:
                row_df = self.songs.filter(pl.col('track_id') == track_id)
                if len(row_df) > 0:
                    _ , artist, title, play_count = row_df[0].row(0)
                    filtered_tracks.append((idx, track_id, artist, title, play_count, total_count))
        print("Done ✅ filtering tracks by keyword.")    
        filtered_tracks_df = pl.DataFrame(filtered_tracks, schema=['index_number', 'track_id' ,'artist', 'title', 'play_count', 'keyword_count']).sort('play_count', descending=True).head(50)
                
        return filtered_tracks_df

In [None]:
# Try ‘love’ as the keyword and look through the lyrics of 3 random tracks given in the list of recommendations – do they have ‘love’ in the lyrics?
mxm_loader = MXMDataLoaderW2V(mxm_dataset_path, mergerd_songs, wv)
mxm_loader.load()

In [None]:
tracks = mxm_loader.get_sorted_tracks_by_keyword('happy', 6, max_tracks=50)
print(tracks)