
Project: Song Recommendation System Based on User Mood
This project aims to create a system that suggests songs based on a user's mood. We will use Spotify and Genius APIs to fetch user data, process this data to create embeddings using a pre-trained transformer model, store these embeddings in a FAISS index, and use LangChain and MLflow to manage the retrieval and generation processes.
 Step-by-Step Guide
 
 1. Setup Environment and Install Dependencies
**Why:** To ensure all necessary packages and tools are available for the project.
**Action:** Install the required libraries such as `lyricsgenius`, `spotipy`, `transformers`, `scikit-learn`, `faiss-cpu`, `tqdm`, and `mlflow`.
**Commands:**


In [None]:
%pip install lyricsgenius
%pip install spotipy
%pip install spotipy lyricsgenius transformers scikit-learn gtts pydub librosa
%pip install faiss-cpu
%pip install tqdm
%pip install torch
%pip install lyricsgenius spotipy transformers scikit-learn gtts pydub librosa faiss-cpu tqdm mlflow
%pip install torch  --index-url https://download.pytorch.org/whl/cu118
%pip install uvicorn
%pip install nest_asyncio
%pip install chromadb
%pip install -U FlagEmbedding
%pip install langchain langchain-community
%pip install sentence-transformers
%pip install peft
%pip install -qU "langchain-chroma>=0.1.2"
%pip install streamlit

In [None]:
%pip install python-dotenv

In [None]:
# Import essential libraries for the project
import spotipy
from spotipy.oauth2 import SpotifyOAuth
import lyricsgenius
from transformers import AutoTokenizer, AutoModel
import torch
import numpy as np
import faiss
import logging
import psutil  # For monitoring system memory
import gc  # For managing memory through garbage collection
import dotenv
import tqdm as notebook_tqdm
import logging
import os

In [None]:
# Load the environment variables
dotenv.load_dotenv()

In [None]:
# Set up logging to monitor and log the flow of execution and potential issues
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

In [None]:
sp_oauth = SpotifyOAuth(
    client_id=os.getenv("SPOTIFY_CLIENT_ID"),
    client_secret=os.getenv("SPOTIFY_CLIENT_SECRET"),
    redirect_uri="http://localhost:8235/callback",  # Ensure this matches your registered Spotify redirect URI
    scope="user-top-read user-library-read playlist-read-private"
)

In [None]:
sp = spotipy.Spotify(auth_manager=sp_oauth)

In [None]:
# import os
# import logging
# from fastapi import FastAPI, HTTPException, Request, Query
# from fastapi.responses import RedirectResponse, JSONResponse
# import spotipy
# from spotipy.oauth2 import SpotifyOAuth
# from dotenv import load_dotenv
# import lyricsgenius
# from spotipy.exceptions import SpotifyException
# import chromadb
# from chromadb.errors import InvalidCollectionException
# # Import BGEM3FlagModel from FlagEmbedding
# from FlagEmbedding import BGEM3FlagModel

# # Load environment variables from .env file
# load_dotenv()

# # Configure logging
# logging.basicConfig(level=logging.INFO)
# logger = logging.getLogger(__name__)

# # Initialize FastAPI app
# app = FastAPI()

# # Spotify OAuth configuration
# sp_oauth = SpotifyOAuth(
#     client_id=os.getenv("SPOTIFY_CLIENT_ID"),
#     client_secret=os.getenv("SPOTIFY_CLIENT_SECRET"),
#     redirect_uri="http://localhost:8235/callback",  # Ensure this matches your registered Spotify redirect URI
#     scope="user-top-read user-library-read playlist-read-private"
# )

# # Initialize Genius API
# genius = lyricsgenius.Genius(os.getenv("GENIUS_API_TOKEN"))

# # Initialize ChromaDB client
# client = chromadb.Client()


# # Load the BGEM3FlagModel
# model = BGEM3FlagModel('BAAI/bge-m3', use_fp16=True)

# def get_spotify_client():
#     token_info = sp_oauth.get_cached_token()

#     if not token_info:
#         # No valid token, redirect to Spotify authorization
#         raise HTTPException(status_code=307, detail="Redirecting to Spotify authorization", headers={"Location": "/login"})

#     access_token = token_info['access_token']
#     sp = spotipy.Spotify(auth=access_token)
#     return sp

# def get_audio_features_and_analysis(sp, track_id):
#     audio_features = sp.audio_features([track_id])[0]  # Fetching audio features
#     audio_analysis = sp.audio_analysis(track_id)       # Fetching audio analysis
#     return {
#         "audio_features": audio_features,
#         "audio_analysis": audio_analysis
#     }

# def generate_embedding(text: str) -> list:
#     # Use the BGEM3FlagModel to generate embeddings
#     embedding = model.encode(text,return_colbert_vecs=True)  # Replace with the actual method for generating embeddings
#     return embedding

# def store_track_embedding(user_id: str, track_info: dict, embedding: list):
    
#     #check if the user has a collection and either create or get it
#     collection_name = f"{user_id}_liked_songs"
#     if collection_name not in client.list_collections():
#         collection = client.create_collection(collection_name)

#     else:
#         collection = client.get_collection(collection_name)
    

#     # Store the track information and embedding in ChromaDB
#     collection.upsert(
#         ids=[track_info['id']],
#         metadatas=[track_info],
#         embeddings=[embedding]
#     )

# @app.get("/")
# async def read_root():
#     return {"message": "Welcome to the Spotify integration with FastAPI"}

# @app.get("/login")
# async def login():
#     # Step 1: Redirect the user to Spotify's authorization page
#     auth_url = sp_oauth.get_authorize_url()
#     logger.info(f"Redirecting to Spotify's authorization URL: {auth_url}")
#     return RedirectResponse(auth_url)

# @app.get("/callback")
# async def callback(request: Request):
#     # Step 2: Handle the redirect from Spotify and get the access token
#     code = request.query_params.get('code')
#     if not code:
#         raise HTTPException(status_code=400, detail="Missing authorization code")

#     token_info = sp_oauth.get_access_token(code)

#     if token_info:
#         logger.info("Access token obtained successfully!")
#         # Redirect to a default page or the originally requested page
#         return RedirectResponse(url="/")
#     else:
#         raise HTTPException(status_code=401, detail="Could not authenticate with Spotify")

# @app.get("/liked_songs")
# async def liked_songs(limit: int = Query(default=100, description="Number of liked songs to fetch")):
#     try:
#         sp = get_spotify_client()
#         liked_songs = sp.current_user_saved_tracks(limit=limit)
#         detailed_songs = []

#         for item in liked_songs['items']:
#             track = item['track']
#             track_info = {
#                 "name": track['name'],
#                 "album": track['album']['name'],
#                 "artists": [artist['name'] for artist in track['artists']],
#                 "url": track['external_urls']['spotify']
#             }
#             track_details = get_audio_features_and_analysis(sp, track['id'])
#             track_info.update(track_details)
#             detailed_songs.append(track_info)
        
#         logger.info(f"Number of liked songs retrieved: {len(liked_songs['items'])}")
#         logger.info(f"Fetching liked songs up to limit: {min(limit, len(liked_songs['items']))}")
        
#         return {"liked_songs": detailed_songs}
#     except HTTPException as e:
#         if e.status_code == 307:
#             return RedirectResponse(url="/login")
#         raise e
#     except SpotifyException as e:
#         if e.http_status == 429:
#             return JSONResponse(status_code=429, content={"message": "Rate limit exceeded, please try again later."})
#         else:
#             return JSONResponse(status_code=500, content={"message": "An error occurred while fetching liked songs."})

# # @app.get("/lyrics")
# # async def lyrics(artist: str, title: str):
# #     try:
# #         song = genius.search_song(title, artist)
# #         if song:
# #             return {"lyrics": song.lyrics}
# #         else:
# #             raise HTTPException(status_code=404, detail="Lyrics not found")
# #     except HTTPException as e:
# #         raise e

# # @app.get("/singleplaylist")
# # async def singleplaylist(playlist_id: str):
# #     try:
# #         sp = get_spotify_client()
# #         playlist = sp.playlist(playlist_id)
# #         detailed_playlist = {
# #             "name": playlist['name'],
# #             "tracks": []
# #         }

# #         for track in playlist['tracks']['items']:
# #             track_info = {
# #                 "name": track['track']['name'],
# #                 "album": track['track']['album']['name'],
# #                 "artists": [artist['name'] for artist in track['track']['artists']],
# #                 "url": track['track']['external_urls']['spotify']
# #             }
# #             track_details = get_audio_features_and_analysis(sp, track['track']['id'])
# #             track_info.update(track_details)
# #             detailed_playlist['tracks'].append(track_info)
        
# #         return {"playlist": detailed_playlist}
# #     except HTTPException as e:
# #         if e.status_code == 307:
# #             return RedirectResponse(url="/login")
# #         raise e

# @app.get("/store_embeddings")
# async def store_embeddings(limit: int = Query(default=50, description="Number of liked songs to fetch")):
#     try:
#         sp = get_spotify_client()
#         liked_songs = sp.current_user_saved_tracks(limit=limit)
#         embedded_count = 0
#         user_id = sp.current_user()['id']

#         for item in liked_songs['items']:
#             if embedded_count >= limit:
#                 break

#             track = item['track']
#             track_info = {
#                 "id": track['id'],
#                 "name": track['name'],
#                 "album": track['album']['name'],
#                 "artists": [artist['name'] for artist in track['artists']],
#                 "url": track['external_urls']['spotify']
#             }
#             track_details = get_audio_features_and_analysis(sp, track['id'])
#             track_info.update(track_details)

#             # Generate embedding
#             track_text = f"{track_info['name']} by {track_info['artists'][0]} from {track_info['album']}"
#             embedding = generate_embedding(track_text)

#             # Store data in ChromaDB
#             store_track_embedding(user_id, track_info, embedding)
#             embedded_count += 1

#         logger.info(f"Number of songs embedded: {embedded_count}")

#         return {"message": f"Successfully embedded {embedded_count} songs"}
#     except HTTPException as e:
#         if e.status_code == 307:
#             return RedirectResponse(url="/login")
#         raise e
#     except SpotifyException as e:
#         if e.http_status == 429:
#             return JSONResponse(status_code=429, content={"message": "Rate limit exceeded, please try again later."})
#         else:
#             return JSONResponse(status_code=500, content={"message": "An error occurred while embedding songs."})

# if __name__ == "__main__":
#     import uvicorn
#     import nest_asyncio
#     import asyncio
    
#     nest_asyncio.apply()

#     async def start_server():
#         config = uvicorn.Config(app, host="127.0.0.1", port=8235)
#         server = uvicorn.Server(config)
#         await server.serve()

#     # Await the start_server function directly
#     await start_server()

In [None]:

# 1. **Initialize ChromaDB Client**:
#    - Create a ChromaDB client instance.
   
# 2. **Initialize Embedding Model**:
#    - Load the BGEM3FlagModel for generating embeddings.

# 3. **Define Function to Store User Data in ChromaDB**:
#    - Check if the ChromaDB collection (e.g., "spotify_embeddings") exists.
#      - If it doesn't exist, create the collection.
#      - Otherwise, retrieve the existing collection.
#    - Add the user's data (track info and embedding) to the collection using a unique ID.

# 4. **Define `/store_embeddings` Endpoint**:
#    - Get the Spotify client.
#    - Fetch the user's liked songs, limiting the number of songs retrieved to 50.
#    - Initialize a counter (`embedded_count`) to track the number of songs processed.
   
#    - **For each song in the liked songs list**:
#      - If the counter reaches 50, break the loop.
#      - Retrieve track information (e.g., name, album, artists, URL).
#      - Fetch additional track details (audio features and analysis).
#      - Update the track information with these details.
     
#      - **Generate Embedding**:
#        - Create a text representation of the track (e.g., "Track name by Artist from Album").
#        - Generate the embedding using the model.
     
#      - **Store Data in ChromaDB**:
#        - Store the user ID, track information, and embedding in ChromaDB.
     
#      - Increment the counter.

#    - Return a success message with the number of songs embedded.

# 5. **Handle Exceptions**:
#    - If the Spotify token is invalid, redirect to the login page.
#    - If other exceptions occur, raise the appropriate HTTP exception.

# ### Example Flow:

# - User requests to store embeddings via the `/store_embeddings` endpoint.
# - The system retrieves up to 50 liked songs.
# - For each song, it generates an embedding and stores it in ChromaDB, ensuring the limit of 50 songs is not exceeded.
# - The endpoint responds with a success message indicating how many songs were processed.

In [None]:
# import chromadb
# # we need to save the data first and then convert it to chroma format 
# # might want to do as key being user then value being the stats of the data 
# chroma_client = chromadb.Client()


# # TODO : add the user to the database
# # TODO : add the song embeddings to the database using the prev cell 

# # sentences_1 = ["What is BGE M3?", "Defination of BM25"]
# # sentences_2 = ["BGE M3 is an embedding model supporting dense retrieval, lexical matching and multi-vector interaction.", 
# #                "BM25 is a bag-of-words retrieval function that ranks a set of documents based on the query terms appearing in each document"]

# # output_1 = model.encode(sentences_1, return_dense=True, return_sparse=True, return_colbert_vecs=True)
# # output_2 = model.encode(sentences_2, return_dense=True, return_sparse=True, return_colbert_vecs=True)

# # print(model.colbert_score(output_1['colbert_vecs'][0], output_2['colbert_vecs'][0]))
# # print(model.colbert_score(output_1['colbert_vecs'][0], output_2['colbert_vecs'][1]))
# # # 0.7797
# # # 0.4620


# from FlagEmbedding import BGEM3FlagModel
# # model we are using for mebedding as colbert vector 

# #indexing model
# model = BGEM3FlagModel('BAAI/bge-m3',  use_fp16=True) 





In [None]:
#TODO UPDATE THE USER QUERY SAME AS THE WAY WE STORE IT 

# from langchain.embeddings import HuggingFaceEmbeddings

# embeddings = HuggingFaceEmbeddings(model_name="distilbert-base-uncased")

# def embed_user_query(user_input):
#     logger.info("Embedding user query using LangChain...")
#     user_embedding = embeddings.embed_query(user_input)
#     logger.info("User query embedded.")
#     user_embedding = np.array(user_embedding) # Reshape to match FAISS input format
#     print("User embedding shape:", user_embedding.shape)  # Debugging: print shape
#     return user_embedding

In [None]:

# FOR ME FROM HERE ONWARD 


In [None]:
# # steps 5
# from langchain.vectorstores import FAISS
# from langchain_community.docstore.in_memory import InMemoryDocstore


# def retrieve_lyrics_with_langchain(query_embedding):
#     logger.info("Performing lyrics retrieval using LangChain...")
#     retriever = FAISS(embedding_function=embeddings.embed_query, index=lyrics_index, docstore=InMemoryDocstore(lyrics_data), index_to_docstore_id={})
#     docs = retriever.similarity_search_by_vector(query_embedding, k=5)
#     logger.info(f"Retrieved top 5 lyrics using LangChain.")
#     return docs

# def retrieve_audio_features_with_langchain(query_embedding):
#     logger.info("Performing audio feature retrieval using LangChain...")
#     retriever = FAISS(embedding_function=embeddings.embed_query, index=audio_index)
#     docs = retriever.similarity_search(query_embedding, k=5)
#     logger.info(f"Retrieved top 5 audio features using LangChain.")
#     return docs
    
# def combine_retrieval_results(lyrics_docs, audio_docs):
#     logger.info("Combining retrieval results...")
#     combined_results = lyrics_docs + audio_docs  # This could be a simple concatenation or more sophisticated merging
#     logger.info(f"Combined {len(combined_results)} results.")
#     return combined_results


In [None]:


# use the above in the cell w retrieve lyrics w langchain


In [None]:
# # step 6
# def format_recommendations(retrieved_docs):
#     logger.info("Formatting recommendations...")
#     formatted_response = "\n".join([f"Song: {doc.metadata['title']} by {doc.metadata['artist']}\n{doc.page_content[:100]}..." for doc in retrieved_docs])
#     logger.info("Recommendations formatted.")
#     return formatted_response

# from transformers import pipeline

# # Initialize the generation pipeline using an open-source model
# generator = pipeline('text-generation', model='gpt2')

# def generate_personalized_response(formatted_recommendations, user_query):
#     logger.info("Generating personalized response using LangChain...")
#     response = generator(f"Context: {formatted_recommendations}\n\nQuestion: {user_query}\nAnswer:", max_length=200, num_return_sequences=1)
#     return response[0]['generated_text']



In [None]:
# # Retrieve lyrics using the query embedding

# # Example user query
# embeddings = HuggingFaceEmbeddings(model_name="distilbert-base-uncased")

# def embed_user_query(user_input):
#     logger.info("Embedding user query using LangChain...")
#     user_embedding = embeddings.embed_query(user_input)
#     logger.info("User query embedded.")
#     user_embedding = np.array(user_embedding) # Reshape to match FAISS input format
#     print("User embedding shape:", user_embedding.shape)  # Debugging: print shape
#     return user_embedding


# user_query = "summer happy vibes"

# # Create an embedding for the user query
# query_embedding = embed_user_query(user_query)

# # Check if the embedding shape matches what FAISS expects (should be 2D, with one row per item)
# print(f"Query embedding shape: {query_embedding.shape}")

# def retrieve_songs(query):
#     # Preprocess the query to get embeddings
#     query_embedding = preprocess_query(query)
    
#     # Search the FAISS indices
#     lyrics_distances, lyrics_indices = lyrics_index.search(query_embedding, k=5)
#     lyrics_results = [lyrics_data[idx] for idx in lyrics_indices[0]]
    
#     audio_distances, audio_indices = audio_index.search(query_embedding, k=5)
#     audio_results = [tracks[idx] for idx in audio_indices[0]]
    
#     # Combine and rank results
#     combined_results = merge_and_rank_results(lyrics_results, audio_results)
#     return combined_results



# def retrieve_lyrics_with_langchain(query_embedding):
#     logger.info("Performing lyrics retrieval using LangChain...")
#     retriever = FAISS(embedding_function=embeddings.embed_query, index=lyrics_index, docstore=InMemoryDocstore(lyrics_data), index_to_docstore_id={})
#     docs = retriever.similarity_search_by_vector(query_embedding, k=5)
#     logger.info(f"Retrieved top 5 lyrics using LangChain.")
#     return docs


# lyrics_docs = retrieve_lyrics_with_langchain(query_embedding)

# # Check the results
# print("Lyrics retrieval results:")
# for doc in lyrics_docs:
#     print(doc.metadata['title'], doc.metadata['artist'])



In [None]:
# load_dotenv()

# sp_oauth = SpotifyOAuth(
#     client_id=os.getenv("SPOTIFY_CLIENT_ID"),
#     client_secret=os.getenv("SPOTIFY_CLIENT_SECRET"),
#     redirect_uri="http://localhost:8235/callback",
#     scope="user-top-read user-library-read playlist-read-private playlist-modify-public playlist-modify-private"
# )


# # get new token
# token_info = sp_oauth.get_access_token()
# access_token = token_info['access_token']

In [None]:
sp_oauth.get_authorize_url()

In [None]:
import os
import logging
from fastapi import FastAPI, HTTPException, Request, Query
from fastapi.responses import RedirectResponse, JSONResponse
import spotipy
from spotipy.oauth2 import SpotifyOAuth
from dotenv import load_dotenv
from spotipy.exceptions import SpotifyException
from langchain_core.documents import Document
from langchain_chroma import Chroma
from FlagEmbedding import BGEM3FlagModel
from uuid import uuid4
import nest_asyncio
import uvicorn
import asyncio
import lyricsgenius

# Load environment variables from .env file
load_dotenv()

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Initialize FastAPI app
app = FastAPI()

# Spotify OAuth configuration
sp_oauth = SpotifyOAuth(
    client_id=os.getenv("SPOTIFY_CLIENT_ID"),
    client_secret=os.getenv("SPOTIFY_CLIENT_SECRET"),
    redirect_uri="http://localhost:8235/callback",
    scope="user-top-read user-library-read playlist-read-private playlist-modify-public playlist-modify-private"
)


# Initialize Genius API
genius = lyricsgenius.Genius(os.getenv("GENIUS_API_TOKEN"))

# Initialize the BGEM3FlagModel for generating real embeddings
model = BGEM3FlagModel('BAAI/bge-m3', use_fp16=True)

def get_spotify_client():
    token_info = sp_oauth.get_cached_token()

    if not token_info:
        raise HTTPException(status_code=307, detail="Redirecting to Spotify authorization", headers={"Location": "/login"})

    access_token = token_info['access_token']
    sp = spotipy.Spotify(auth=access_token)
    return sp

from langchain_community.embeddings import HuggingFaceBgeEmbeddings 

# Initialize the BGEM3FlagModel for generating real embeddings
model_name = "BAAI/bge-m3"

embedding_function = HuggingFaceBgeEmbeddings(model_name=model_name)

def get_text_collection(user_id: str):
    # Create a unique collection name for textual data based on user ID
    collection_name = f"{user_id}_text_collection"
    
    # Initialize Chroma vector store for text data with the embedding function
    vector_store = Chroma(
        collection_name=collection_name,
        embedding_function=embedding_function,
        persist_directory="./chroma_langchain_db"
    )
    
    return vector_store

  


def get_audio_collection(user_id: str):
    # Create a unique collection name for audio data based on user ID
    collection_name = f"{user_id}_audio_collection"
    
    # Initialize Chroma vector store for audio data (no embeddings needed)
    vector_store = Chroma(
        collection_name=collection_name,
         embedding_function=embedding_function,
        persist_directory="./chroma_langchain_db"
    )
    
    return vector_store

def get_audio_features_and_analysis(sp, track_id):
    audio_features = sp.audio_features([track_id])[0]  # Fetching audio features
    audio_analysis = sp.audio_analysis(track_id)       # Fetching audio analysis
    return {
        "audio_features": audio_features,
        "audio_analysis": audio_analysis
    }


def filter_none_metadata(metadata):
    """
    Recursively filters out None values from the metadata dictionary.
    """
    if isinstance(metadata, dict):
        return {k: filter_none_metadata(v) for k, v in metadata.items() if v is not None}
    return metadata


@app.get("/lyrics")
async def lyrics(artist: str, title: str):
    try:
        song = genius.search_song(title, artist)
        if song:
            return {"lyrics": song.lyrics}
        else:
            raise HTTPException(status_code=404, detail="Lyrics not found")
    except HTTPException as e:
        raise e

@app.get("/")
async def read_root():
    return {"message": "Welcome to the Spotify integration with FastAPI"}

@app.get("/login")
async def login():
    auth_url = sp_oauth.get_authorize_url()
    logger.info(f"Redirecting to Spotify's authorization URL: {auth_url}")
    return RedirectResponse(auth_url)

@app.get("/callback")
async def callback(request: Request):
    code = request.query_params.get('code')
    if not code:
        raise HTTPException(status_code=400, detail="Missing authorization code")

    token_info = sp_oauth.get_access_token(code)

    if token_info:
        logger.info("Access token obtained successfully!")
        return RedirectResponse(url="/")
    else:
        raise HTTPException(status_code=401, detail="Could not authenticate with Spotify")

import json

def convert_lists_to_strings(metadata):
    """
    Convert lists in the metadata to JSON strings before storing them in ChromaDB.
    Convert None values to empty strings.
    """
    new_metadata = {}
    for key, value in metadata.items():
        if value is None:
            new_metadata[key] = ""  # Convert None to empty string
        elif isinstance(value, list):
            new_metadata[key] = json.dumps(value)
        elif isinstance(value, dict):
            new_metadata[key] = convert_lists_to_strings(value)  # Recursively handle nested dictionaries
        else:
            new_metadata[key] = value
    return new_metadata


def convert_strings_to_lists(metadata):
    """
    Convert JSON strings in the metadata back to lists after retrieving them from ChromaDB.
    """
    new_metadata = {}
    for key, value in metadata.items():
        try:
            # Try to convert the string back to a list
            new_metadata[key] = json.loads(value) if isinstance(value, str) else value
        except json.JSONDecodeError:
            # If it's not a valid JSON string, keep the value as is
            new_metadata[key] = value
    return new_metadata

@app.get("/store_embeddings")
async def store_embeddings(limit: int = Query(default=50, description="Number of liked songs to fetch")):
    try:
        sp = get_spotify_client()
                
                # Get the current user's ID as early as possible
        user_id = sp.current_user()['id']  # Get the current user's ID

                # Get the text collection
        text_store = get_text_collection(user_id)
                        
                # Get the current number of songs in the collection
        curr_number_of_songs = text_store._collection.count()
        
        # Determine the offset for the Spotify API request
        if curr_number_of_songs >= limit:
            offset = curr_number_of_songs + limit
        else:
            offset = 0

        liked_songs = sp.current_user_saved_tracks(limit=limit, offset=offset)
        
        user_id = sp.current_user()['id']  # Get the current user's ID

        # Get or create vector stores for text and audio data
        text_store = get_text_collection(user_id)
        audio_store = get_audio_collection(user_id)

        text_documents = []
        audio_documents = []
        ids = []

        for item in liked_songs['items']:
            track = item['track']
            track_id = track['id']
            track_info = {
                "id": track_id,
                "name": track['name'],
                "album": track['album']['name'],
                "artists": [artist['name'] for artist in track['artists']],
                "url": track['external_urls']['spotify']
            }

            # Convert lists to JSON strings in track_info
            track_info = convert_lists_to_strings(track_info)

            # Handle potential None values in track_info
            track_info = filter_none_metadata(track_info)

            song_lyrics = ""
            artist_name = json.loads(track_info["artists"])[0]  # Get the first artist's name
            try: 
                song = genius.search_song(track_info["name"], artist_name)
            except HTTPException as e:
                e.__traceback__()



            # if song:
            #     song_lyrics = song.lyrics or ""

            # Create the document for the text collection
            text_doc = Document(
                page_content=f"{track_info['name']} by {json.loads(track_info['artists'])} from {track_info['album']}\nLyrics: {song_lyrics}",
                metadata={"url": track_info["url"], "track_id": track_id},
            )
            text_documents.append(text_doc)
            ids.append(track_id)  # Use track ID as the document ID for both collections

            # Get audio features and analysis
            audio_data = sp.audio_features([track_id])[0]  # Fetching audio features for the track
            
            # Convert lists to JSON strings in audio_data
            audio_data = convert_lists_to_strings(audio_data)

            # Handle potential None values in audio_data
            audio_data = filter_none_metadata(audio_data)

            # Combine the track_info and audio features into a single metadata dictionary
            combined_metadata = {**track_info, **audio_data, "lyrics": song_lyrics}

            # Handle potential None values in combined_metadata
            combined_metadata = filter_none_metadata(combined_metadata)

            # Create the document for the audio collection
            audio_doc = Document(
                page_content="Audio features and analysis data",
                metadata=combined_metadata,
            )
            audio_documents.append(audio_doc)

        # Store the documents in the respective vector stores
        text_store.add_documents(documents=text_documents, ids=ids)
        audio_store.add_documents(documents=audio_documents, ids=ids)
        
        logger.info(f"Number of songs embedded for user {user_id}: {len(text_documents)}")

        return {"message": f"Successfully embedded {len(text_documents)} songs for user {user_id}"}
    except HTTPException as e:
        if e.status_code == 307:
            return RedirectResponse(url="/login")
        raise e
    except SpotifyException as e:
        if e.http_status == 429:
            return JSONResponse(status_code=429, content={"message": "Rate limit exceeded, please try again later."})
        else:
            return JSONResponse(status_code=500, content={"message": "An error occurred while embedding songs."})

import logging

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@app.post("/create_playlist")
async def create_playlist(query: str, k: int = Query(default=5, description="Number of results to fetch")):
    try:
        sp = get_spotify_client()
        user_id = sp.current_user()['id']  # Get the current user's ID

        # Create a new playlist with the query as the name
        playlist_name = query
        playlist_description = f"Playlist created based on the search query: {query}"
        new_playlist = sp.user_playlist_create(user_id, name=playlist_name, public=False, description=playlist_description)
        playlist_id = new_playlist['id']
        
        logger.info(f"Created new playlist: {playlist_name} with ID: {playlist_id}")

        # Search for songs and retrieve the results
        search_results = await search(query, k)
        recommendations = await get_recommendations(query, k)

        # Extract URIs of tracks from the search results
        search_uris = [
            f"spotify:track:{result['metadata']['track_id']}"
            for result in search_results['results'] if 'track_id' in result['metadata']
        ]

        # Extract URIs of tracks from the recommendations
        recommendation_uris = [
            f"spotify:track:{track['id']}"
            for track in recommendations['recommendations']
        ]

        # Combine URIs from both search and recommendations
        song_uris = search_uris + recommendation_uris


        # Add songs to the playlist
        if song_uris:
            sp.playlist_add_items(playlist_id, song_uris)
            logger.info(f"Added {len(song_uris)} songs to the playlist: {playlist_name}")
        logger.info(f"Created new playlist: {playlist_name} with ID: {playlist_id}")
        return {"message": f"Playlist '{playlist_name}' created and {len(song_uris)} songs added."}
    
    except HTTPException as e:
        if e.status_code == 307:
            return RedirectResponse(url="/login")
        raise e
    except SpotifyException as e:
        if e.http_status == 429:
            return JSONResponse(status_code=429, content={"message": "Rate limit exceeded, please try again later."})
        else:
            return JSONResponse(status_code=500, content={"message": "An error occurred while creating the playlist."})

@app.get("/get_recommendations")
async def get_recommendations(query: str, k: int = Query(default=5, description="Number of search results to fetch")):
    try:
        sp = get_spotify_client()
        user_id = sp.current_user()['id']  # Get the current user's ID
 
        # Step 1: Perform the search and retrieve results
        search_results = await search(query, k)
        seed_tracks = []
        seed_artists = []
        seed_genres = []
 
        for result in search_results['results']:
            metadata = result['metadata']
            if metadata and 'track_id' in metadata:
                seed_tracks.append(metadata['track_id'])
            if metadata and 'artists' in metadata:
                seed_artists.extend(metadata['artists'])  # Assuming artists' IDs are stored in metadata
 
        # Use only up to 5 seeds as required by Spotify API
        seed_tracks = seed_tracks[:5]
        seed_artists = seed_artists[:5]
 
        # Step 2: Get recommendations based on seeds
        recommendations = sp.recommendations(
            seed_tracks=seed_tracks,
            seed_artists=seed_artists,
            seed_genres=seed_genres,
            limit=k,  # Adjust the limit as needed
            market="US"  # Adjust the market as needed
        )
 
        # Step 3: Filter out tracks already liked by the user
        liked_songs = sp.current_user_saved_tracks(limit=50)
        liked_track_ids = {item['track']['id'] for item in liked_songs['items']}
 
        filtered_recommendations = [track for track in recommendations['tracks'] if track['id'] not in liked_track_ids]

        final_recommendations = filtered_recommendations[:k]
        return {"recommendations": final_recommendations}
        
 
    except HTTPException as e:
        if e.status_code == 307:
            return RedirectResponse(url="/login")
        raise e
    except SpotifyException as e:
        if e.http_status == 429:
            return JSONResponse(status_code=429, content={"message": "Rate limit exceeded, please try again later."})
        else:
            return JSONResponse(status_code=500, content={"message": "An error occurred while fetching recommendations."})

# def format_recommendations(recommendations):
#     formatted_list = []
#     for rec in recommendations:
#         description = rec["text"]
#         formatted_list.append(description)
#     return formatted_list

# def format_response(recommendations):
#     response = "Here are some songs we think you'll like:\n"
    
#     formatted_recommendations = format_recommendations(recommendations)
#     response += "\n".join(formatted_recommendations)
    
#     return response




In [None]:
import nest_asyncio
import uvicorn
import subprocess
from fastapi import FastAPI

# Apply the nest_asyncio patch to allow running FastAPI in a Jupyter notebook
nest_asyncio.apply()


# Function to run the FastAPI server
def run_fastapi():
    uvicorn.run(app, host="127.0.0.1", port=8235)

# Run FastAPI server directly
run_fastapi()

# Start Streamlit app in a subprocess
subprocess.run(["streamlit", "run", "app.py"])


In [None]:
if __name__ == "__main__":
    nest_asyncio.apply()
    async def start_server():
        config = uvicorn.Config(app, host="127.0.0.1", port=8235)
        server = uvicorn.Server(config)
        await server.serve()

    await start_server()

In [None]:
%pip freeze > requirements.txt 
 

In [None]:
sp = get_spotify_client()
        
        # Get the current user's ID as early as possible
user_id = sp.current_user()['id']  # Get the current user's ID

        # Get the text collection
text_store = get_text_collection(user_id)
        
        # Replace with the actual collection name if it's different
collection_name = f"{user_id}_text_collection"
        
        # Get the current number of songs in the collection
curr_number_of_songs = text_store._collection

In [None]:
curr_number_of_songs.count()

In [None]:
import streamlit as st
import requests

# Set FastAPI backend URL
backend_url = "http://127.0.0.1:8235"

st.title("Spotify Integration with FastAPI")

# Login button
if st.button("Login to Spotify"):
    response = requests.get(f"{backend_url}/login")
    if response.status_code == 200:
        st.write("Redirecting to Spotify login...")
        st.write(response.url)
    else:
        st.write("Error in Spotify login")

# Search for songs
query = st.text_input("Enter a song or artist to search for:")
k = st.number_input("Number of results:", min_value=1, max_value=50, value=5)

if st.button("Search"):
    params = {'query': query, 'k': k}
    response = requests.get(f"{backend_url}/search", params=params)
    if response.status_code == 200:
        results = response.json().get("results", [])
        if results:
            for result in results:
                st.write(f"**{result['metadata']['name']}** by {result['metadata']['artists']}")
                st.write(result['metadata']['url'])
        else:
            st.write("No results found")
    else:
        st.write("Error in search")

# Store embeddings
if st.button("Store Embeddings"):
    response = requests.get(f"{backend_url}/store_embeddings")
    if response.status_code == 200:
        st.write(response.json().get("message"))
    else:
        st.write("Error in storing embeddings")

# Create playlist
playlist_query = st.text_input("Enter a query to create a playlist based on:")
playlist_k = st.number_input("Number of songs to include in playlist:", min_value=1, max_value=50, value=5)

if st.button("Create Playlist"):
    params = {'query': playlist_query, 'k': playlist_k}
    response = requests.post(f"{backend_url}/create_playlist", params=params)
    if response.status_code == 200:
        st.write(response.json().get("message"))
    else:
        st.write("Error in creating playlist")

# Get recommendations
recommend_query = st.text_input("Enter a query to get song recommendations based on:")
recommend_k = st.number_input("Number of recommendations:", min_value=1, max_value=50, value=5)

if st.button("Get Recommendations"):
    params = {'query': recommend_query, 'k': recommend_k}
    response = requests.get(f"{backend_url}/get_recommendations", params=params)
    if response.status_code == 200:
        recommendations = response.json().get("recommendations", [])
        for rec in recommendations:
            st.write(f"**{rec['name']}** by {rec['artists'][0]['name']}")
            st.write(rec['external_urls']['spotify'])
    else:
        st.write("Error in fetching recommendations")


In [None]:
import spotipy
from spotipy.oauth2 import SpotifyOAuth


load_dotenv()
sp_oauth = SpotifyOAuth(
    client_id=os.getenv("SPOTIFY_CLIENT_ID"),
    client_secret=os.getenv("SPOTIFY_CLIENT_SECRET"),
    redirect_uri="http://localhost:8235/callback",
    scope="user-top-read user-library-read playlist-read-private playlist-modify-public playlist-modify-private"
)

def get_spotify_client():
    token_info = sp_oauth.get_cached_token()

    if not token_info:
        raise HTTPException(status_code=307, detail="Redirecting to Spotify authorization", headers={"Location": "/login"})

    access_token = token_info['access_token']
    sp = spotipy.Spotify(auth=access_token)
    return sp




In [None]:
# import os

# # Define the directory structure
# structure = [
#     "app/routes",
#     "app/services",
#     "app/models",
#     "app/utils",
#     "tests"
# ]

# # Define the files to be created
# files = {
#     "app/__init__.py": "",
#     "app/main.py": "",
#     "app/routes/__init__.py": "",
#     "app/routes/auth.py": "",
#     "app/routes/lyrics.py": "",
#     "app/routes/embeddings.py": "",
#     "app/routes/playlists.py": "",
#     "app/services/__init__.py": "",
#     "app/services/spotify_service.py": "",
#     "app/services/chroma_service.py": "",
#     "app/services/genius_service.py": "",
#     "app/models/__init__.py": "",
#     "app/models/user.py": "",
#     "app/models/document.py": "",
#     "app/utils/__init__.py": "",
#     "app/utils/logging.py": "",
#     "app/utils/helpers.py": "",
#     "app/config.py": "",
#     "tests/__init__.py": "",
#     "tests/test_auth.py": "",
#     "tests/test_lyrics.py": "",
#     "tests/test_embeddings.py": "",
#     "tests/test_playlists.py": "",
#     ".env": "",
#     "requirements.txt": "",
#     "README.md": "",
#     "run.py": ""
# }

# # Create the directories
# for directory in structure:
#     os.makedirs(directory, exist_ok=True)

# # Create the files
# for file_path, content in files.items():
#     with open(file_path, "w") as f:
#         f.write(content)

# structure, files


In [None]:
sp = get_spotify_client()

results = sp.current_user_saved_tracks(limit=10, offset=1)
for idx, item in enumerate(results['items']):
    track = item['track']
    print(idx, track['artists'][0]['name'], " – ", track['name'])