# Programming Assignment 1
## Genre Classification using Locality Sensitive Hashing (LSH)


In [24]:
#Imports

import pandas as pd
import numpy as np

### Data Loading and Preprocessing 

In [136]:
"""
TODO: Do the indcies of X_train get mixed up? I think this
"""
# Load data
df_tracks = pd.read_csv('tracks.csv', index_col=0, header=[0, 1])
df_tracks = df_tracks[df_tracks['set']['subset'] == 'medium']
df_features = pd.read_csv('features.csv', index_col=0, header=[0, 1, 2])

# Filter by genres
df_tracks = df_tracks[df_tracks['track']['genre_top'].isin(['Hip-Hop', 'Pop', 'Folk', 'Rock', 'Experimental', 'International', 'Electronic', 'Instrumental'])]

# Split df_tracks into training, testing, and validation sets
df_tracks_train = df_tracks[df_tracks.iloc[:, 30] == 'training']
df_tracks_test = df_tracks[df_tracks.iloc[:, 30] == 'test']
df_tracks_validation = df_tracks[df_tracks.iloc[:, 30] == 'validation']

# Match features with tracks for training, testing, and validation
df_features_train = df_features[df_features.index.isin(df_tracks_train.index)]
df_features_test = df_features[df_features.index.isin(df_tracks_test.index)]
df_features_validation = df_features[df_features.index.isin(df_tracks_validation.index)]


# Create tuples of (data, indices) for each dataset
train_data_with_indices = (df_features_train.values, df_features_train.index)
test_data_with_indices = (df_features_test.values, df_features_test.index)
validation_data_with_indices = (df_features_validation.values, df_features_validation.index)

"""
# Extract feature values
X_train = df_features_train.values
X_test = df_features_test.values
X_validation = df_features_validation.values
"""


# Extract genre labels
y_train = df_tracks_train['track']['genre_top']
y_test = df_tracks_test['track']['genre_top']
y_validation = df_tracks_validation['track']['genre_top']

In [143]:
# Assuming train_data_with_indices is a tuple in the form (X_train, indices_train)
X_debug, indices_debug = train_data_with_indices  # Unpack the tuple

# Now, take the first 5 elements from both X_debug and indices_debug
X_debug_first5 = X_debug[:5]
indices_debug_first5 = indices_debug[:5]

# If you need to recombine these into a tuple format for some operation
debug_data_with_indices_first5 = (X_debug_first5, indices_debug_first5)


### Random Projection Matrix

In [34]:
# r_i = rowsize, r_j) = columsize
def generate_random_matrix(r_i, r_j):
    rij = np.random.choice([-1, 0, 1], size=(r_i, r_j), p=[1/6, 2/3, 1/6])
    return np.sqrt(3) * rij

### Hashtable generator function

We use the transpose of the Random Projection Matrix to reduce the dimensionality  and determine the orientation of each track's data relative to the hyperplanes by using the dot Product of the feature matrix and the transposed Random Projection Matrix. 
Then we use the binary representations of the orientations as a bucket and put in the tracks accordingly. 
$ \begin{cases} 
0 & \text{ if } x < 0 \\
1 & \text{ else}
\end{cases}
$ 
We can do this because of $\mathbf{a} \cdot \mathbf{b} = \|\mathbf{a}\| \|\mathbf{b}\| \cos(\theta)$ positive means on one side and negative on the other.
This whole process represents one hashtable.

In [144]:
"""
The binary representations are of length l.
And the number of hashtables we creat is equal to n.
"""
def hashtable_generator(data_indices_tuple, l=32, n=20):
    X, indices = data_indices_tuple  # Unpack the tuple into data and indices
    hash_tables_and_matrices = []
    for _ in range(n):
        buckets = {}
        random_matrix = generate_random_matrix(l, X.shape[1])
        X_dot = np.dot(X, random_matrix.T)
        X_dot = X_dot > 0
        X_dot = X_dot.astype(int)


        for i in range(len(X_dot)):
            hash_str = ''.join(X_dot[i].astype(str))
            if hash_str not in buckets:
                buckets[hash_str] = []
            buckets[hash_str].append(indices[i])  # Use the original DataFrame index
        
        hash_tables_and_matrices.append((buckets, random_matrix))
    
    return hash_tables_and_matrices


In [148]:
hash_debug = (hashtable_generator(debug_data_with_indices_first5))

### Similar Songs Finder
In this step we use the computed hash_tables and the according matrices to find all similar songs of the input song. 
> A music track is defined as similar if it is in the same bucket as $t_i$ in one of the $n$ hash tables.


In [149]:
import numpy as np

def find_similar_songs(song_input, hash_tables_and_matrices):
    """
    Finds and returns the original DataFrame indices of songs similar to the given input song.
    
    Parameters:
    - song_input: The feature array of the song for which similar songs are to be found.
    - hash_tables_and_matrices: A list of tuples, where each tuple contains a hash table (dictionary)
      of song indices keyed by their hash, and the random matrix used to project the songs into hash space.
    
    Returns:
    - A list of the original DataFrame indices of songs similar to the input song.
    """
    similar_songs_indices = set()

    for buckets, random_matrix in hash_tables_and_matrices:
        # Project the input song using the random matrix and generate its hash
        song_projected = np.dot(song_input, random_matrix.T) > 0
        song_hash = ''.join(song_projected.astype(int).astype(str))

        # If the hash is found in the buckets, update the set of similar song indices
        if song_hash in buckets:
            similar_songs_indices.update(buckets[song_hash])

    return list(similar_songs_indices)


In [152]:
sim_debug = find_similar_songs(X_debug[0], hash_debug)

same but for multiple times

In [30]:
def find_matching_song_multiple( times, song_input):
    found_categories = []
    for _ in range(times):
        print("doing it times ", _)
        local_categories = find_similar_songs(song_input,hashtable_generator(X_train))
        if len(local_categories) == 0:
            continue
        genres = []
        for element in local_categories:
            genres.append(y_train.iloc[element[0]])
        found_categories.append(max(set(genres), key=genres.count))
        #TODO check if reset necessary
        #self.reset()
    return max(set(found_categories), key=found_categories.count)

### Distance Computation of Similar Songs
This function computes the distance of all similar Songs to the input Song.

In [161]:
import numpy as np

def compute_distances(train_data_with_indices, song_input, similar_songs_indices, metric="euclid", cut=10):
    X_data, X_indices = train_data_with_indices  # Unpack the tuple
    index_to_position = {index: pos for pos, index in enumerate(X_indices)}
    
    filtered_songs = []
    if metric == "euclid":
        for index in similar_songs_indices:
            pos = index_to_position[index]
            distance = np.linalg.norm(X_data[pos] - song_input)
            filtered_songs.append((index, distance))
    elif metric == "cosine":
        for index in similar_songs_indices:
            pos = index_to_position[index]
            # Cosine similarity
            dot_product = np.dot(X_data[pos], song_input)
            norm_song = np.linalg.norm(X_data[pos])
            norm_input = np.linalg.norm(song_input)
            similarity = dot_product / (norm_song * norm_input)
            
            # Convert similarity to distance (cosine distance)
            distance = 1 - similarity
            filtered_songs.append((index, distance))
    else:
        raise ValueError("Invalid metric specified. Use 'euclid' or 'cosine'.")

    sorted_songs = sorted(filtered_songs, key=lambda x: x[1])
    if cut is not None:
        sorted_songs = sorted_songs[:cut]
    
    return [index for index, _ in sorted_songs]
#

In [162]:
print(compute_distances(debug_data_with_indices_first5, X_debug[0], sim_debug))
matching_s = compute_distances(debug_data_with_indices_first5, X_debug[0], sim_debug)

[3, 134, 139, 198, 136]


### Getting the Genre by Majority vote

In [163]:
import pandas as pd

def determine_genre_by_majority_vote(song_indices, Y):
    """
    Determines the most common genre among the given song indices.

    Parameters:
    - song_indices: A list of indices for the songs.
    - Y: A pandas Series where the index corresponds to song indices and the values to genres.

    Returns:
    - The genre that occurs most frequently among the given songs.
    """

    # Ensure Y is a pandas Series to use .loc efficiently
    if not isinstance(Y, pd.Series):
        raise ValueError("Y must be a pandas Series mapping song indices to genres.")

    # Filter song_indices to ensure they are within the range of Y's index
    valid_indices = [i for i in song_indices if i in Y.index]
    
    # Extract the genres for the given (valid) indices
    genres = Y.loc[valid_indices]

    # Use value_counts() to count and find the most common genre efficiently
    majority_genre = genres.value_counts().idxmax()
    
    return majority_genre



In [164]:
determine_genre_by_majority_vote(matching_s, y_train)

'Hip-Hop'

In [39]:
def test_accuracy_with_find_matching_songs_multiple_optimized(times):
    correct = 0
    two_d_array = [[0 for _ in range(times)] for _ in range(len(X_test))]
    for _ in range(times):
        print("iteration",_)
        table = hashtable_generator(X_train)
        for i in range(len(X_test)):
            song = X_test[i]
            genres = []
            matching_songs = find_similar_songs(song,table)
            if len(matching_songs) == 0:
                continue
            for element in matching_songs:
                genres.append(y_train.iloc[element])  # Remove [0] subscript here
            two_d_array[i][_] = (max(set(genres), key=genres.count))
    for i in range(len(X_test)):
        genres = [two_d_array[i][_] for _ in range(times)]
        if max(set(genres), key=genres.count) == y_test.iloc[i]:
            correct += 1
    accuracy = correct/len(X_test)
    #print(f"Accuracy Test set advanced: {accuracy}")
    return accuracy
print(test_accuracy_with_find_matching_songs_multiple_optimized(2))

iteration 0
iteration 1
0.3993485342019544
