# Preprocessing for Music Recommendation System

### Importing necessary libraries

In [2]:
import os
import pandas as pd
import numpy as np
import librosa
from tqdm import tqdm
from mutagen.mp3 import MP3
from mutagen.easyid3 import EasyID3
from pymongo import MongoClient

### Function to extract MFCC features from the audio files

In [3]:
def extract_mfcc(file_path,  num_mfcc = 20, max_length=500):
    try:
        # load audio file
        audio, sr = librosa.load(file_path, sr=None)
        # Extract MFCC features
        mfccs = librosa.feature.mfcc(y=audio, sr=sr, n_mfcc=num_mfcc)
        #pad or truncate MFCC features to max_length
        if mfccs.shape[1] < max_length:
            mfccs = np.pad(mfccs, ((0,0), (0, max_length - mfccs.shape[1])), mode = 'constant')
        else:
            mfccs = mfccs[:, :max_length]
        #return MFCC features
        return mfccs.T
    except Exception as e:
        print(f"Error processing {file_path}: {e}")
            

### Function to get files from the specified directory

In [4]:
# function to iterate through all files in a folder and its sub-folders
def iterate_files(root_folder):
    for root, dirs, files in os.walk(root_folder):
        for filename in files:
            yield os.path.join(root, filename)
            
root_folder = 'fma_large'

### Extracting mfcc features and storing it in mongoDB

In [12]:
# Connect to MongoDB
client = MongoClient('mongodb://localhost:27017')  # Connection URI for MongoDB
db = client['BDA_Project']  
collection = db['songs_data'] 

# iterate through audio files in the folder and its sub folders
for file_path in tqdm(iterate_files(root_folder)):
    # initializing lists to store flattened features and labels
    # if file_path.endswith('000536.mp3'):
    #     break
    
    if file_path.endswith('.mp3'):
        # Extract features
        mfcc = extract_mfcc(file_path)
        if mfcc is not None:
            #flatten the mfcc features
            flat_mfcc = mfcc.flatten()
            data = {
                'feature': flat_mfcc.tolist(),
                'path': file_path
            }
            collection.insert_one(data)
            del flat_mfcc
            del mfcc
            del data
            
client.close()
    

### Extracting the meta data of the songs and storing it in MongoDB

In [None]:
# Connect to MongoDB
client = MongoClient('mongodb://localhost:27017')  # Connection URI for MongoDB
db = client['BDA_Project']  
collection = db['songs_meta_data'] 

# iterate through audio files in the folder and its sub folders
for file_path in tqdm(iterate_files(root_folder)):
    if file_path.endswith('.mp3'):
        # Load the MP3 file
        audio = MP3(file_path, ID3=EasyID3)
        # Extract metadata
        metadata = {}
        metadata["path"] = file_path
        for key in audio.keys():
            if key in ["album", "title", "artist", "genre", "date"]: 
                metadata[key] = audio[key][0]
        
        collection.insert_one(metadata)
        del metadata
        
    # if file_path.endswith('000536.mp3'):
    #     break
            
client.close()

316it [00:00, 678.80it/s]


In [None]:
<prompt>
I need to make a readme file for the following project. The files run in the following order:
1. preprocess.ipynb (preprocesses the data and stores it in MongoDB)
2. music_recommendation_system_using_Kmeans.py (performs Kmeans clustering on the data)
3. Kmeans_evaluation.py (makes sillouhette score and elbow plot to evaluate the Kmeans clustering)
4. app.py (Flask app that serves the recommendation system and also calls the producer and consumer functions as needed.)
<prompt/>
<preprocess.ipynb>
    File attached above
<preprocess.ipynb/>

<music_recommendation_system_using_Kmeans.py>
    from pyspark.sql import SparkSession
    import pymongo
    import pandas as pd
    from pyspark.sql.types import StructType, StructField, StringType, ArrayType, FloatType
    from pyspark.sql.functions import substring
    from pyspark.sql.functions import udf
    from pyspark.ml.linalg import Vectors, VectorUDT
    from pyspark.ml.clustering import KMeans

    # Initialize Spark
    spark = SparkSession \
        .builder \
        .appName("Music Recommendation Model") \
        .getOrCreate()

    # Connect to MongoDB
    client = pymongo.MongoClient("mongodb://localhost:27017/")
    db = client["BDA_Project"]
    collection = db["songs_data"]

    # Retrieve data from the collection
    cursor = collection.find()

    # Convert the data to a Pandas DataFrame
    df_pandas = pd.DataFrame(list(cursor))
    # Drop the _id field from the DataFrame
    df_pandas = df_pandas.drop(columns=['_id'])
    # Convert the array elements to floating-point values
    df_pandas["feature"] = df_pandas["feature"].apply(lambda x: [float(i) for i in x])

    # Define the schema for the Spark DataFrame
    schema = StructType([
        StructField("feature", ArrayType(FloatType()), True),
        StructField("path", StringType(), True), 
    ])

    df = spark.createDataFrame(df_pandas, schema = schema)

    array_to_vector_udf = udf(lambda arr: Vectors.dense(arr), VectorUDT())
    df = df.select(array_to_vector_udf(df['feature']).alias('features'), df['path'])

    # Extract the number from the "path" column
    df = df.withColumn("label", substring("path", 15, 6))

    # Convert the "number" column to integer type
    df = df.withColumn("label", df["label"].cast("int"))

    kmeans = KMeans(featuresCol = "features", k = 10, seed = 1)
    model = kmeans.fit(df)

    predictions = model.transform(df)

    feature_rdd = df.select("label", "features", "path").rdd.map(lambda x: (x[0], x[1], x[2]))
    model.save('kmeans_model')

    # Save feature_rdd locally
    feature_rdd.saveAsPickleFile("feature_rdd.pickle")
    df.write.parquet('df.parquet')

    # Stop Spark session
    spark.stop()
<music_recommendation_system_using_Kmeans.py/>

<Kmeans_evaluation.py>
    from pyspark.sql import SparkSession
    from pyspark.ml.clustering import KMeansModel
    from pyspark.ml.evaluation import ClusteringEvaluator

    # Initialize Spark
    spark = SparkSession.builder \
        .appName("kmeans evaluation for Similar Songs") \
        .getOrCreate()

    # # Load saved feature_rdd
    df = spark.read.parquet('df.parquet')

    model = KMeansModel.load('kmeans_model')
    predictions = model.transform(df)

    # clusterin evaluations
    evaluator_silhouette = ClusteringEvaluator(featuresCol= 'features', predictionCol= 'prediction', metricName= 'silhouette')
    silhouette_score = evaluator_silhouette.evaluate(predictions)

    print("Silhouette Score:", silhouette_score)

    spark.stop()
<Kmeans_evaluation.py/>

<app.py>
    from flask import Flask, render_template, send_from_directory, url_for
    from pymongo import MongoClient
    from bson.objectid import ObjectId
    import os
    import requests
    from producer import send_song_data
    import json
    from flask import jsonify
    import time


    client = MongoClient("mongodb://localhost:27017/")
    db = client.BDA_Project
    collection = db.songs_meta_data

    app = Flask(__name__)

    LASTFM_API_KEY = '31fe3d279288de0a7f8997ffb8cba2ab'

    def get_album_art(artist, album):
        url = "http://ws.audioscrobbler.com/2.0/"
        params = {
            "method": "album.getinfo",
            "api_key": LASTFM_API_KEY,
            "artist": artist,
            "album": album,
            "format": "json"
        }
        response = requests.get(url, params=params)
        data = response.json()
        if response.status_code == 200 and 'album' in data:
            return data['album'].get('image', [{}])[-1].get('#text')  # Get the largest image
        else:
            return url_for('static', filename='default.jpg')

    def load_audio_metadata():
        audio_files = []
        for item in collection.find():
            audio_files.append(item)
        return audio_files

    audio_files = load_audio_metadata()

    @app.route('/audio/<path:filename>')
    def send_audio(filename):
        # Convert backslashes to forward slashes
        filename = filename.replace('\\', '/')
        # directory = r'D:\University\Semester 4\BIG DATA\project'
        directory = r'/home/hdoop/Documents/project/'
        full_path = os.path.join(directory, filename)
        if not os.path.exists(full_path):
            print(f"File not found: {full_path}")  # Debug output
            return "File not found", 404
        return send_from_directory(directory, filename, as_attachment=False)


    @app.route('/')
    def home():
        return render_template('index.html', audio_files=audio_files)


    @app.route('/song/<song_id>')
    def song_details(song_id):
        
        song = collection.find_one({'_id': ObjectId(song_id)})
        album_art_url = get_album_art(song['artist'], song['album'])
        song_path = song['path']
        song_number = int(song_path[15:20:1])
        send_song_data(song_number)

        start_time = time.time()
        timeout = 20
        while time.time() - start_time < timeout:
            try:
                with open('recommended_songs.json', 'r') as json_file:
                    similar_songs = json.load(json_file)
                if similar_songs.get('song_id') == song_number:
                    break
            except (IOError, ValueError, KeyError):
                pass
            time.sleep(1)
        else:
            return 'Recommendation processing timeout.', 503
        
        similar_songs_path = similar_songs.get('path')

        print(similar_songs_path)
        similar_songs_cursor = collection.find({"path": {"$in": similar_songs_path}})
        similar_songs_list = list(similar_songs_cursor)
        similar_songs_dict = {song['path']: song for song in similar_songs_list}
        ordered_similar_songs = [similar_songs_dict[path] for path in similar_songs_path if path in similar_songs_dict]

        if song:
            return render_template('song.html', song=song, album_art_url=album_art_url, similar_songs = ordered_similar_songs)
        else:
            return 'Song not found', 404

    if __name__ == '__main__':
        app.run(debug=True)

<app.py/>

<producer.py>
    from kafka import KafkaProducer
    import json

    # Initialize Kafka Producer
    producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                            value_serializer=lambda x: json.dumps(x).encode('utf-8'))


    def send_song_data(song_id):
        message = {'song_number': song_id}
        producer.send('topic', value=message)
        print(f"Sent song to topic: {message}")
        producer.flush()
<producer.py/>

<consumer.py>
    from kafka import KafkaConsumer
    import json
    from pyspark.ml.clustering import KMeansModel
    from scipy.spatial.distance import cosine
    from pyspark.sql import SparkSession

    # Initialize Spark
    spark = SparkSession.builder \
        .appName("Query Annoy Index for Similar Songs") \
        .getOrCreate()

    # Load saved feature_rdd
    feature_rdd = spark.sparkContext.pickleFile("feature_rdd.pickle")
    df = spark.read.parquet('df.parquet')

    model = KMeansModel.load('kmeans_model')
    predictions = model.transform(df)

    # Initialize Kafka Consumer
    consumer = KafkaConsumer(
        'topic',
        bootstrap_servers=['localhost:9092'],
        # auto_offset_reset='earliest',
        value_deserializer=lambda x: json.loads(x.decode('utf-8'))
    )

    def get_similar_songs_path(song_number, similar_song_numbers):
        similar_songs_paths = []
        for number in similar_song_numbers:
            if song_number ==number:
                continue
            # Filter the RDD to find the record(s) with the matching number
            matching_record = feature_rdd.filter(lambda x: x[0] == number)
            matching_record = matching_record.first()
            corresponding_path = matching_record[2]
            similar_songs_paths.append(corresponding_path)
        
        return similar_songs_paths

    def recommend_top_songs(song_number, predictions, input_song_features,top_n = 10):
        # Find the cluster assigned to the given song
        cluster = predictions.filter(predictions['Label'] == song_number).select('prediction').collect()[0]['prediction']
        
        # Get other songs belonging to the same cluster
        recommended_songs = predictions.filter(predictions['prediction'] == cluster).select('Label', 'Features').collect()
        
        # # Filter out the input song from the recommendations
        recommended_songs = [(song['Label'], song['Features']) for song in recommended_songs if song['Label'] != song_number]
        
        # Calculate similarity scores between the input song and recommended songs
        similarity_scores = []
        for recommended_song_id, recommended_song_features in recommended_songs:
            similarity_score = cosine(input_song_features, recommended_song_features)
            similarity_scores.append((recommended_song_id, similarity_score))
        
        # Sort recommended songs based on similarity scores
        ranked_recommendations = sorted(similarity_scores, key=lambda x: x[1])
        
        # Return the top N recommended songs
        top_recommendations = [song_id for song_id, _ in ranked_recommendations[:top_n]]

        return top_recommendations


    for message in consumer:
        print("Consumer started running")
        message_data = message.value
        song_number = message_data['song_number']
        print(f"Received song for processing: {song_number}")

        song_features = feature_rdd.lookup(song_number)[0]
        similar_song_numbers = recommend_top_songs(song_number, predictions, feature_rdd.lookup(song_number)[0])
        similar_songs_path = get_similar_songs_path(song_number, similar_song_numbers)

        similar_songs = {'song_id': song_number, 'path': similar_songs_path}
        print(similar_songs)
        with open('recommended_songs.json', 'w') as json_file:
            json.dump(similar_songs, json_file)

<consumer.py/>


# Music Recommendation System

The project assigned focused on developing a streamlined alternative to Spotify by creating a sophisticated music recommendation and streaming system. This system is designed to assess and implement cutting-edge machine learning algorithms and big data technologies for effective music information retrieval (MIR). The project is structured into several phases: creating an ETL pipeline to process and store features from a large music dataset, utilizing Apache Spark for training a music recommendation model, deploying the model in a Flask-based web application, and using Apache Kafka for real-time music recommendation generation.

## Files

### 1. `preprocess.ipynb`

This piece of code is responsible for the initial preprocessing of the data. It includes loading the Free Music Archive (FMA) dataset, extracting essential audio features using techniques such as Mel-Frequency Cepstral Coefficients (MFCC), and storing these features in a MongoDB database for easy access and manipulation later in the project.

### 2. `music_recommendation_system_using_Kmeans.py`

This Python script uses Apache Spark to perform K-means clustering on the preprocessed data stored in MongoDB. The main goal is to group similar songs based on their features to enhance the recommendation process.

### 3. `Kmeans_evaluation.py`

This script evaluates the K-means clustering model implemented in the previous file. It generates evaluation metrics like the silhouette score and elbow plot, which help in assessing the clustering performance and determining the optimal number of clusters.

### 4. `app.py`

This Flask application serves as the frontend of the music recommendation system. It interacts with MongoDB to fetch song data, displays song details, and manages user interactions. The app also integrates Kafka to handle real-time music recommendations based on user activity.

### Additional Files

#### `producer.py`

This script acts as a Kafka producer. It sends song data to a specified Kafka topic whenever a song is played, enabling real-time processing and recommendation.

#### `consumer.py`

Complementing the producer, this Kafka consumer listens for messages (song plays) and triggers the recommendation process based on the current song and user activity. It uses the features and models stored to recommend similar songs.

---

## Installation and Usage

### Setting Up the Environment
1. Ensure Kafka and MongoDB services are running.
2. Create a topic in Kafka named `topic`.

### Execution Instructions
1. **Start the Kafka Producer**:  
   Run `python producer.py` to begin data ingestion into Kafka.
2. **Execute the Kafka Consumer**:  
   Run `python apriori.py` to start the consumer process that applies the Apriori algorithm and writes the output to MongoDB.
   Run `python pcy.py` to start the consumer process that applies the pcy algorithm and writes the output to MongoDB.
   Run `python custom.py` to start the consumer process that applies the custom algorithm and writes the output to MongoDB.
3. **Activate MongoDB & Open Mongosh**:      Activate Mongo by typing `sudo systemctl start mongod` terminal
   Run Mongosh Terminal by typing `mongosh` in the terminal
   Run `use mydatabase` to select the database  4. **View the Database for the PCY consumer**:  
   Run `db.frequent_itemsets.deleteMany({})` to delete the previous content for the PCY consumer database
   Run `db.frequent_itemsets.find().pretty()` to view the database for the PCY consumer 5. **View the Database for the Apriori consumer**:      Run `db.apriori.deleteMany({})` to delete the previous content for the Apriori consumer database
   Run `db.apriori.find().pretty()` to view the database for the Apriori consumer
6. **View the Database for the custom consumer**:      Run `db.custom.deleteMany({})` to delete the previous content for the custom consumer database
   Run `db.custom.find().pretty()` to view the database for the custom consumer


## Contributors
- **Hamza Burney** || 22i-2058
- **Irtiza Abbas** || 22I-1862
- **Zain Abbas** || 22I-1905



---

