In [2]:
import pandas as pd
import os
import numpy as np
from mutagen.easyid3 import EasyID3
import librosa

# Define the file paths for the extracted CSV files
tracks_csv_path = '/home/syed/project/fma_metadata/metadata/tracks.csv'

# Load tracks data into a DataFrame
tracks_df = pd.read_csv(tracks_csv_path)

# Example function to extract MFCC and Spectral Centroid features from an audio file
def extract_features(audio_file):
    # Load audio file
    y, sr = librosa.load(audio_file)
    
    # Extract MFCC features
    mfcc = librosa.feature.mfcc(y=y, sr=sr)
    
    # Extract Spectral Centroid
    spectral_centroid = librosa.feature.spectral_centroid(y=y, sr=sr)
    
    # Calculate mean values for MFCC and Spectral Centroid
    mean_mfcc = np.mean(mfcc, axis=1)
    mean_spectral_centroid = np.mean(spectral_centroid)
    
    
    return mfcc, spectral_centroid

# Function to extract titles from MP3 files in a folder
def extract_titles_from_mp3(folder):
    titles = {}
    
    # Traverse through each MP3 file in the folder
    for filename in os.listdir(folder):
        if filename.endswith('.mp3'):
            file_path = os.path.join(folder, filename)
            
            # Extract metadata using mutagen
            try:
                audio = EasyID3(file_path)
                title = audio['title'][0] if 'title' in audio else None
                titles[filename] = title
            except Exception as e:
                print(f"Error processing {filename}: {e}")
    
    return titles

# Example usage
folder = '/home/syed/project/fma_metadata/fma_large/000'
titles = extract_titles_from_mp3(folder)
print("Titles:", titles)

# Dictionary to store mapping between audio file titles and track IDs
audio_track_mapping = {}

# Iterate over the titles extracted from audio files
for audio_filename, title in titles.items():
    # Search for matching title in tracks_df
    match = tracks_df[tracks_df['title'] == title]
    # If a match is found, retrieve the track ID
    if not match.empty:
        # Extract the track_id from the matched row
        track_id = match['id'].iloc[0]  # Assuming there's only one match per title
        audio_track_mapping[audio_filename] = track_id
    else:
        print(f"No match found for title '{title}'")

print("Audio Track Mapping:", audio_track_mapping)


from sklearn.preprocessing import MinMaxScaler
import numpy as np
import pandas as pd
import re

# Define the file path for the extracted CSV file

# Load tracks data into a DataFrame
tracks_df = pd.read_csv(tracks_csv_path, header=[0, 1])

# Rename the 'id' column to ('track', 'id')
tracks_df.columns = [('track', 'id') if col[0] == 'id' else col for col in tracks_df.columns]

# Select relevant columns for preprocessing
selected_columns = [
    ('track', 'id'),  # Include the ('track', 'id') column separately
    ('album', 'title'), ('album', 'favorites'), ('album', 'listens'),
    ('artist', 'bio'), ('artist', 'location'), ('artist', 'associated_labels'),
    ('track', 'comments'), ('track', 'favorites'), ('track', 'listens'), ('track', 'genre_top')
]

# Extract the selected columns and create a new dataframe
selected_df = tracks_df[selected_columns]

# Preprocessing steps
# Fill missing values
selected_df.fillna({'artist_bio': '', 'artist_location': '', 'artist_labels': '',
                    'track_comments': 0, 'track_favorites': 0, 'track_listens': 0}, inplace=True)

# Text preprocessing for artist_bio
selected_df[('artist', 'bio')] = selected_df[('artist', 'bio')].str.lower()  # Lowercasing
selected_df[('artist', 'bio')] = selected_df[('artist', 'bio')].fillna('').apply(lambda x: re.sub(r'<[^>]+>', '', x))  # Remove HTML tags
selected_df[('artist', 'bio')] = selected_df[('artist', 'bio')].str.replace('[^\w\s]', '')  # Remove punctuation
selected_df[('artist', 'bio')] = selected_df[('artist', 'bio')].str.replace(r'\s+', ' ', regex=True).str.strip()  # Remove extra whitespaces
# Text preprocessing for artist_labels
selected_df[('artist', 'associated_labels')] = selected_df[('artist', 'associated_labels')].fillna('').apply(lambda x: re.sub(r'<[^>]+>', '', x))  # Remove HTML tags
selected_df[('artist', 'associated_labels')] = selected_df[('artist', 'associated_labels')].str.replace(r'\s*,\s*', ',')  # Remove spaces around commas
selected_df[('artist', 'associated_labels')] = selected_df[('artist', 'associated_labels')].str.split(',')  # Split the string by commas
from sklearn.preprocessing import LabelEncoder, StandardScaler

# Initialize encoders and scaler
label_encoders = {}
scaler = StandardScaler()

# Separate numerical and categorical columns
numerical_columns = [('album', 'favorites'), ('album', 'listens'),
                     ('track', 'comments'), ('track', 'favorites'), ('track', 'listens')]
categorical_columns = [('album', 'title'), ('artist', 'bio'), ('artist', 'location'), ('artist', 'associated_labels'), ('track', 'genre_top')]

# Encoding and normalization for categorical columns
for col in categorical_columns:
    label_encoders[col] = LabelEncoder()
    selected_df[col] = label_encoders[col].fit_transform(selected_df[col].astype(str))
import numpy as np

# Replace non-numeric values with NaN
selected_df[numerical_columns] = selected_df[numerical_columns].apply(pd.to_numeric, errors='coerce')

# Drop rows with NaN values
selected_df.dropna(subset=numerical_columns, inplace=True)

# Normalization for numerical columns
selected_df[numerical_columns] = scaler.fit_transform(selected_df[numerical_columns])

# Print data types of numerical columns for the first few rows
print("Data types of numerical columns for the first few rows:")
print(selected_df[numerical_columns].head())

# Print data types of categorical columns for the first few rows
print("\nData types of categorical columns for the first few rows:")
print(selected_df[categorical_columns].head())
from pymongo import MongoClient
import os
import numpy as np

# Connect to MongoDB
mongo_uri = "mongodb://localhost:27017"
client = MongoClient(mongo_uri)

# Access or create a database
db = client['audio_features_database']

# Access or create a collection
collection = db['audio_features_collection']

# Iterate over the audio files and store their features in the collection
for audio_filename, track_id in audio_track_mapping.items():
    # Construct the full path to the audio file
    audio_file = os.path.join(folder, audio_filename)
    
    # Extract features from the audio file
    mfcc_features, spectral_centroid_feature = extract_features(audio_file)
    
    # Reshape the features for normalization
    mfcc_features_reshaped = mfcc_features.reshape(-1, 1)
    spectral_centroid_feature_reshaped = np.array([spectral_centroid_feature]).reshape(-1, 1)
    
    # Normalize the features
    normalized_mfcc_features = scaler.fit_transform(mfcc_features_reshaped).flatten().tolist()
    normalized_spectral_centroid_feature = scaler.fit_transform(spectral_centroid_feature_reshaped).flatten().tolist()
    
    # Fetch additional metadata from selected_df based on track_id
    metadata = selected_df[selected_df[('track', 'id')] == track_id].iloc[0]
    
    # Create a document to insert into the collection
    document = {
        'track_id': track_id,
        'audio_filename': audio_filename,
        'normalized_mfcc_features': normalized_mfcc_features,
        'normalized_spectral_centroid_feature': normalized_spectral_centroid_feature,
        'title': metadata[('album', 'title')],
        'favorites': metadata[('album', 'favorites')],
        'listens': metadata[('album', 'listens')],
        'genre': metadata[('track', 'genre_top')]
    }
    
    # Insert the document into the collection
    collection.insert_one(document)

print("Audio features along with metadata have been successfully stored in MongoDB.")
from pymongo import MongoClient
from annoy import AnnoyIndex
from pymongo import MongoClient
from annoy import AnnoyIndex

# Connect to MongoDB
try:
    client = MongoClient('mongodb://localhost:27017/')
    db = client['audio_features_database']
    collection = db['audio_features_collection']
except Exception as e:
    print("Error connecting to MongoDB:", e)
    exit()

# Retrieve data from MongoDB
try:
    data = collection.find({}, {'normalized_mfcc_features': 1, 'listens': 1, 'favorites': 1, 'genre': 1, 'normalized_spectral_centroid_feature': 1, 'title': 1, 'track_id': 1})
except Exception as e:
    print("Error retrieving data from MongoDB:", e)
    exit()

# Convert MongoDB cursor to a list of dictionaries
try:
    data_list = list(data)
except Exception as e:
    print("Error converting MongoDB cursor to list:", e)
    exit()

# Extract features and other relevant information
mfcc_features = [doc.get('normalized_mfcc_features', []) for doc in data_list]
spectral_centroid_features = [doc.get('normalized_spectral_centroid_feature', []) for doc in data_list]
titles = [doc.get('title', '') for doc in data_list]
track_ids = [doc.get('track_id', '') for doc in data_list]
genre = [doc.get('genre', '') for doc in data_list]

# Combine MFCC and spectral centroid features
combined_features = [np.hstack((mfcc, centroid)) for mfcc, centroid in zip(mfcc_features, spectral_centroid_features)]

# Build ANN index
num_features = len(combined_features[0]) if combined_features else 0
ann_index = AnnoyIndex(num_features, 'euclidean')
# Add items to the index
for i, feature_vector in enumerate(combined_features):
    # Check the length of the feature vector
    if len(feature_vector) == num_features:
        ann_index.add_item(i, feature_vector)
    else:

# Build the index
ann_index.build(n_trees=100)

# Function to recommend tracks based on a given track ID
def recommend_tracks(track_id, num_recommendations=5):
    try:
        track_index = track_ids.index(track_id)
    except ValueError:
        print(f"Track ID '{track_id}' not found.")
        return []

    neighbor_indices = ann_index.get_nns_by_item(track_index, num_recommendations)
    recommended_tracks = [(track_ids[idx], titles[idx]) for idx in neighbor_indices]

    return recommended_tracks

# Example usage
track_id_to_recommend_for = '1'
recommendations = recommend_tracks(track_id_to_recommend_for)
print("Recommended tracks:")
for track_id, title in recommendations:
    print(f"Track ID: {track_id}, Title: {title}")
from sklearn.metrics import average_precision_score
from sklearn.metrics import ndcg_score

# Ground truth data (replace this with your actual ground truth data)
ground_truth = {
    'user1': ['track3', 'track1'],
    'user2': ['track2', 'track1', 'track4'],
    # Add more users and their preferences here
}

# Function to evaluate the model using MAP and NDCG
def evaluate_model(recommendation_function):
    map_scores = []
    ndcg_scores = []
    
    for user, ground_truth_tracks in ground_truth.items():
        recommended_tracks = [track_id for track_id, _ in recommendation_function(user)]
        
        # Calculate Average Precision
        y_true = [1 if track in ground_truth_tracks else 0 for track in recommended_tracks]
        map_score = average_precision_score(y_true, range(len(y_true)))
        map_scores.append(map_score)
        
        # Calculate NDCG
        ground_truth_relevance = [1 if track in ground_truth_tracks else 0 for track in recommended_tracks]
        ndcg_score = ndcg_score([ground_truth_relevance], [ground_truth_relevance])
        ndcg_scores.append(ndcg_score)
    
    # Calculate average MAP and NDCG scores
    average_map = sum(map_scores) / len(map_scores)
    average_ndcg = sum(ndcg_scores) / len(ndcg_scores)
    
    print("Average MAP:", average_map)
    print("Average NDCG:", average_ndcg)

# Call the evaluate_model function with your recommendation function
evaluate_model(recommend_tracks)

  tracks_df = pd.read_csv(tracks_csv_path)


Titles: {'000569.mp3': 'Shadow Law', '000483.mp3': 'Magnolias Milan', '000565.mp3': 'Sophia', '000495.mp3': 'Waterfall (Emerald City Coffee, Seattle)', '000554.mp3': 'Disturbed', '000561.mp3': 'Fairies', '000546.mp3': 'If You Have No One', '000515.mp3': "Let's Climb a Mossy Hill", '000468.mp3': 'Vast and Sad (KPSU)', '000501.mp3': 'Utmost House', '000576.mp3': 'Shot At the Post Office', '000490.mp3': 'Vast and Sad', '000469.mp3': 'Waverly', '000516.mp3': 'Kohoutek', '000513.mp3': 'Victoria 2', '000477.mp3': "Let's Climb a Mossy Hill (Kelly Haus)", '000481.mp3': 'Council Bluffs', '000514.mp3': 'Four Doors', '000538.mp3': 'Your Magic Motion', '000549.mp3': 'Venice, CA', '000560.mp3': 'Serious', '000493.mp3': 'In the Sky', '000476.mp3': 'Leftward Glance', '000498.mp3': 'Birdlike Movements', '000512.mp3': None, '000502.mp3': 'Vast and Sad (Showoff Gallery, Bellingham)', '000536.mp3': 'Riding On Your Fears', '000500.mp3': 'Longview, WA', '000470.mp3': 'Cloud Light', '000571.mp3': 'Medicine 

In [3]:
from pymongo import MongoClient

# Connect to MongoDB (assuming it's running locally on the default port)
client = MongoClient('localhost', 27017)

# Access or create a database
db = client['audio_features_database']

# Access or create a collection
collection = db['audio_features_collection']

# Iterate over the audio files and store their features in the collection
for audio_filename, track_id in audio_track_mapping.items():
    # Construct the full path to the audio file
    audio_file = os.path.join(folder, audio_filename)
    
    # Extract features from the audio file
    mfcc_features, spectral_centroid_feature = extract_features(audio_file)
    
    # Reshape the features for normalization
    mfcc_features_reshaped = mfcc_features.reshape(-1, 1)
    spectral_centroid_feature_reshaped = np.array([spectral_centroid_feature]).reshape(-1, 1)
    
    # Normalize the features
    normalized_mfcc_features = scaler.fit_transform(mfcc_features_reshaped).flatten().tolist()
    normalized_spectral_centroid_feature = scaler.fit_transform(spectral_centroid_feature_reshaped).flatten().tolist()
    
    # Create a document to insert into the collection
    document = {
        'track_id': track_id,
        'audio_filename': audio_filename,
        'normalized_mfcc_features': normalized_mfcc_features,
        'normalized_spectral_centroid_feature': normalized_spectral_centroid_feature
    }
    
    # Insert the document into the collection
    collection.insert_one(document)

print("Audio features have been successfully stored in MongoDB.")


Audio features have been successfully stored in MongoDB.


In [None]:

# Find all documents
result_all = collection.find()
print("Found all documents:")
for document in result_all:
    print(document)
