# Libraries

In [1]:
import gradio as gr
import google.generativeai as genai
import pandas as pd
import sqlite3
import torch
from transformers import pipeline
from PIL import Image
import os

# Parameters

In [2]:
#dataset_path = "data/song_lyrics 2.csv"
name_of_table =  "music" # after 2014 and views 100000

# Loading Dataset to database

In [3]:
def delete_tables(db_name='music_data.db', tables_to_delete=['music', 'temp']):
    # Create SQLite database connection
    conn = sqlite3.connect(db_name)
    
    # Drop specified tables if they exist
    for table_name in tables_to_delete:
        conn.execute(f"DROP TABLE IF EXISTS {table_name};")
        print(f"Table {table_name} has been deleted if it existed.")
    
    # Close the connection
    conn.close()

# Usage
# delete_tables()  # Add more table names if needed

In [4]:
def vacuum_database(db_name='music_data.db'):
    # Create SQLite database connection
    conn = sqlite3.connect(db_name)
    
    # Execute the VACUUM command
    conn.execute("VACUUM;")
    print(f"The database {db_name} has been vacuumed to reclaim disk space.")
    
    # Close the connection
    conn.close()

# Usage
# vacuum_database()

In [5]:
def load_data_to_sqlite(dataset_path, frac=1, random_state=42, db_name='music_data.db', table_name='music', batch_size=50000):
    # Create SQLite database connection
    conn = sqlite3.connect(db_name)
    
    # # Drop the table if it exists
    # conn.execute(f"DROP TABLE IF EXISTS {table_name};")
    # print(f"Table {table_name} has been deleted if it existed.")
    
    # Create a new table with id as the primary key
    conn.execute(f'''
    CREATE TABLE IF NOT EXISTS {table_name} (
        id INTEGER PRIMARY KEY,
        title TEXT NOT NULL,
        tag TEXT NOT NULL,
        artist TEXT NOT NULL,
        year INTEGER NOT NULL,
        views INTEGER NOT NULL,
        lyrics TEXT NOT NULL
    );
    ''')
    
    # Initialize a counter for total rows
    total_rows = 0

    # Read and process the CSV in chunks
    for chunk in pd.read_csv(dataset_path, chunksize=batch_size):
        
        # Randomly sample from the current chunk
        sampled_chunk = chunk.sample(frac=frac, random_state=random_state)
        
        # Select the relevant columns
        selected_columns = sampled_chunk[['title', 'tag', 'artist', 'year', 'views', 'lyrics']]
        
        # Remove rows with None values
        selected_columns = selected_columns.dropna()
        
        # Filter rows with views > 100000 and year > 2014
        selected_columns = selected_columns[(selected_columns['views'] > 100000) & (selected_columns['year'] > 2014)]
        
        # Add a primary key column (ID)
        # Adjust the range based on total_rows for ID continuity
        selected_columns['id'] = range(total_rows + 1, total_rows + len(selected_columns) + 1)
        
        # Check if there are rows to insert after filtering
        if not selected_columns.empty:
            # Insert data into the table
            selected_columns.to_sql(table_name, conn, if_exists='append', index=False)
            # Update the total rows count
            total_rows += len(selected_columns)
            # Print progress
            print(f"Inserted {len(selected_columns)} rows. Total rows inserted: {total_rows}")
        else:
            print("No rows to insert after filtering.")

    # Print the structure of the table
    print("Table Structure:")
    structure_query = conn.execute(f"PRAGMA table_info({table_name});")
    for column_info in structure_query.fetchall():
        print(column_info)
    
    # Query the table to verify data has been loaded
    query = conn.execute(f"SELECT COUNT(*) FROM {table_name}")
    row_count = query.fetchone()[0]
    print(f"Total rows in the table: {row_count}")
    
    # Close the connection
    conn.close()


# Query Database

In [6]:
def query_database(query, db_name='music_data.db'):
    # Reopen the SQLite connection if it's closed
    conn = sqlite3.connect(db_name)
    
    # Execute the query
    result = conn.execute(query)
    
    # Fetch all rows from the result
    rows = result.fetchall()
    
    # Close the connection
    conn.close()
    
    return rows

In [7]:
query = f"SELECT * FROM {name_of_table} ORDER BY views DESC LIMIT 10;"
result = query_database(query)

# Preprocessing

In [8]:
# model to find the emotion
device = 0 if torch.cuda.is_available() else -1  # Use device 0 for GPU, -1 for CPU

# Load the emotion classifier with GPU support if available
emotion_classifier = pipeline(
    "text-classification",
    model="j-hartmann/emotion-english-distilroberta-base",
    return_all_scores=True,
    device=device  # Specify the device here
)



In [9]:
def get_mood(text_input):

    # Tokenize the input text first using the tokenizer
    tokenizer = emotion_classifier.tokenizer  # Get the tokenizer from the pipeline
    tokens = tokenizer.tokenize(text_input)
    # Truncate the tokenized input to the model's maximum token limit (e.g., 512 tokens)
    # Leave space for special tokens
    truncated_tokens = tokens[:min(200, len(tokens))]
    
    truncated_text = tokenizer.convert_tokens_to_string(truncated_tokens)
    moods = emotion_classifier(truncated_text)
    if type(moods) == list and type(moods[0]) == list:
        moods = moods[0]

    # Sort moods by score (confidence)
    moods_sorted = sorted(moods, key=lambda x: x['score'], reverse=True)
    # Get the top mood with the highest confidence
    mood = moods_sorted[0]['label']
    return mood

## Add emotion columns

In [10]:
def add_column(column_name = "emotion", table_name='music', db_name='music_data.db'):
    conn = sqlite3.connect(db_name)
    cursor = conn.cursor()
    # Add a new column 'emotion' to the table
    cursor.execute(f"ALTER TABLE {table_name} ADD COLUMN {column_name} TEXT;")
    print(f"Column '{column_name}' has been added to the table.")
    conn.commit()
    conn.close()
# add_column(table_name=name_of_table)

## Update emotion

In [11]:
def update_emotion(column_name="emotion", table_name='music', db_name='music_data.db', batch_size=1000):
    
    from nltk.tokenize import word_tokenize

    # Connect to the database
    conn = sqlite3.connect(db_name)
    cursor = conn.cursor()

    offset = 0
    while True:
        # Fetch a batch of songs
        cursor.execute(f"SELECT id, lyrics FROM {table_name} LIMIT {batch_size} OFFSET {offset}")
        songs = cursor.fetchall()

        # If no more rows are fetched, break out of the loop
        if not songs:
            break

        updates = []
        # Update each song with its corresponding emotion
        for song in songs:
            song_id, lyrics = song
            tokens = word_tokenize(lyrics, preserve_line=True)
            # Lowercase and remove punctuation
            tokens = [word.lower() for word in tokens if word.isalpha()]
            truncated_text = ' '.join(tokens[:min(200, len(tokens))])
            emotion = get_mood(truncated_text)
            updates.append((emotion, song_id))
            # Print or log the current row being processed
            # print(f"Updating song ID: {song_id} with emotion: {emotion}")
            
        # Perform a batch update
        if updates:
            retry_attempts = 5
            for attempt in range(retry_attempts):
                try:
                    cursor.executemany(f"""
                        UPDATE {table_name}
                        SET {column_name} = ?
                        WHERE id = ?
                    """, updates)
                    print(f"{offset} done!")
                    break  # Exit retry loop on success
                except sqlite3.OperationalError as e:
                    if "database is locked" in str(e):
                        print(f"Database locked. Retrying ({attempt + 1}/{retry_attempts})...")
                        time.sleep(0.5)  # Wait before retrying
                    else:
                        raise  # Reraise if it's a different error
            else:
                print(f"Failed to update after {retry_attempts} attempts.")
            
        # Commit the changes to the database after each batch
        conn.commit()

        # Move to the next batch
        offset += batch_size

    # Close the connection
    conn.close()

# update_emotion(table_name=name_of_table)

# Interface

## functions

### prompt engineering / rules

In [12]:
# Initialize the chatbot as a Lyrics Expert
def become_lyrics_expert(converted_history):
    # Act as a Lyrics Expert
    converted_history.append({"role": "user", "parts": "You are a lyrics expert. Answer only lyrics-related questions."})
    converted_history.append({"role": "model", "parts": "Understood! I’m here to provide insights and suggestions based on song lyrics."})

    # User level
    converted_history.append({"role": "user", "parts": "You're talking to someone who loves analyzing lyrics."})
    converted_history.append({"role": "model", "parts": "Great! I’ll provide detailed analyses and song recommendations based on lyrics."})

    # Keywords in lyrics
    converted_history.append({"role": "user", "parts": "The user is looking for songs containing specific keywords."})
    converted_history.append({"role": "model", "parts": "I’ll suggest songs that contain the provided keywords in their lyrics."})

    # Positive or negative preference
    converted_history.append({"role": "user", "parts": "The user has a preference for songs with positive or negative lyrics."})
    converted_history.append({"role": "model", "parts": "I’ll provide song recommendations based on the user's preference for either positive or negative lyrics."})

    # Sensitive topic (suicide)
    converted_history.append({"role": "user", "parts": "The user may express suicidal thoughts."})
    converted_history.append({"role": "model", "parts": "I’ll respond with positive messages and suggest uplifting songs to provide support and encouragement."})

    # # No small talk
    # converted_history.append({"role": "user", "parts": "Do not engage in small talk."})
    # converted_history.append({"role": "model", "parts": "Understood! I’ll focus solely on lyrics-related inquiries without casual conversation."})

    # Solely Lyrics
    converted_history.append({"role": "user", "parts": "Only discuss lyrics analysis. No way to talk about anything else."})
    converted_history.append({"role": "model", "parts": "I’ll ensure all responses are strictly related to analyzing lyrics, song suggestions, and recommendations."})

    # Sensitive topics: # Sucide
    converted_history.append({"role": "user", "parts": "Only discuss lyrics analysis. No way to talk about anything else."})
    converted_history.append({"role": "model", "parts": "I’ll ensure all responses are strictly related to analyzing lyrics, song suggestions, and recommendations."})

# Add to the history
def add_last_history(converted_history, last_history):
    user_message, model_response = last_history
    if user_message and model_response:
        converted_history.append({"role": "user", "parts": user_message})
        converted_history.append({"role": "model", "parts": model_response})


### prompt to query

In [13]:
def prompt_to_generate_sql(user_input, table_name):
    detected_mood = get_mood(user_input)
    prompt = f"""
    You are an expert in converting English questions to SQL queries!
    The SQL database has one table. The name of the table is '{table_name}'.
    
    The '{table_name}' table has the following columns:
    'id', 'title', 'tag', 'artist', 'year', 'views', 'lyrics','emotion'

    Your job is to write an SQL query based on the given prompt. Focus on retrieving songs using relevant columns such as 'title', 'artist', 'views', 'lyrics', etc.

    Example user input: “What's the song that goes "there's a fire starting in my heart”
    SQL query: SELECT title, artist FROM {table_name} WHERE lyrics LIKE '%there's a fire starting in my heart%';

    Example user input: “Can you give me the full lyrics of the song love song?”
    SQL query: SELECT lyrics FROM {table_name} WHERE title ='love song' ORDER BY views DESC LIMIT 1;
    
    Example user input: “Can you give me the full lyrics of the song love song by Taylor Swift?”
    SQL query: SELECT lyrics FROM {table_name} WHERE title ='love song' AND artist = 'Taylor Swift';

    Example user input: “If I give you a mood, can you suggest some songs and their artists?”
    SQL query: SELECT title, artist FROM {table_name} WHERE emotion = '{detected_mood}' ORDER BY RANDOM() LIMIT 1;
    
    Example user input: “I am sad, can you give me some music?”
    SQL query: SELECT title, artist FROM {table_name} WHERE emotion = '{detected_mood}' ORDER BY RANDOM() LIMIT 1;

    Example user input: “I want the best song by ABBA”
    SQL query: SELECT title, artist FROM {table_name} WHERE artist = 'ABBA' ORDER BY views DESC LIMIT 1;

    Example user input: “Give me a rock song from 2023.”
    SQL query: SELECT title, artist FROM {table_name} WHERE year = 2023 AND tag = 'rock' ORDER BY RANDOM() LIMIT 1;

    Example user input: I want to kill myself
    SQL query: SELECT title, artist FROM {table_name} WHERE title = '1-800-273-8255' AND artist = 'Logic';    
    
    Example user input: {user_input}
    """
    return prompt

### query to respond

In [14]:
def prompt_for_friendly_response(user_input, sql_result):
    prompt = f"""
    You are an expert in converting SQL results into user-friendly responses.
    Here's the SQL result: {sql_result} and here is the user input: {user_input}
    
    Your task is to generate a descriptive response based on the data, and the user input. If the data doesnt help just go for a meaningful respond based on the user_input. 
    Make sure the response is meaningful and easy to understand.

    For example, if the result shows the total number of songs or the average duration of songs, mention it clearly. If it shows the titles of songs, list them with proper formatting.
    or if it's about Suicide, suggest based on data and this line https://www.folkhalsomyndigheten.se/the-public-health-agency-of-sweden/living-conditions-and-lifestyle/suicide-prevention/ or anything helpful in sweden 
    """
    return prompt

def prompt_for_friendly_response_with_no_results(user_input, history):
    
    prompt = f"""
    You are a lyrics expert. Answer only lyrics-related questions. and user asked {user_input} and response based on your knowledge and remember you are a lyrics expert.
    
    Make sure the response is meaningful and easy to understand.

    For example, if user said hi, introduce yourself and ask how you can help user.

    For example, if the used said "what song did you just give me?" Use {history} to tell them what the last song you gave them was

    For example, if the used said "what did I just ask you?" Use {history} to tell them what they previously asked you

    For example, if the used said "what was the first thing I asked you?" Use {history} to look at the first thing you were asked
    """
    return prompt

### define the model

In [15]:
genai.configure(api_key=os.environ["API_KEY"])
model = genai.GenerativeModel("gemini-1.5-flash", safety_settings="BLOCK_NONE",
        system_instruction="You are a song expert.")
#Starting a chat so we can give the bot memory using the history
converted_history = []
become_lyrics_expert(converted_history)

chat = model.start_chat(history=converted_history)

In [16]:
def response(inputs, history, temperature=0.7, top_k=50, top_p=0.9):
    print(history)
    # If prompt is empty
    if inputs['text'] == '' and inputs['files'] == []:
        return "Please provide text input or upload an image."

    # If prompt is text-only
    if inputs['files'] == []:
        user_input = inputs["text"].lower()
        print(user_input)
        if "hello" in user_input:
            return "Hello! I am a melody expert. Ask me anything about songs or artists."
        elif "bye" in user_input:
            return "Goodbye! Have a great day!"
        else:
            prompt = prompt_to_generate_sql(inputs["text"].lower(), name_of_table)
            query = chat.send_message(prompt,stream = False)
            query = query.text.strip("```").strip()
            query = query.split("\n")[1].strip()
            #print(query)
            result_from_db = query_database(query)
            if result_from_db == []:
                result_prompt = prompt_for_friendly_response_with_no_results(user_input, history)
                return chat.send_message(result_prompt).text
            #print(result_from_db)
            result_prompt = prompt_for_friendly_response(inputs["text"].lower(), result_from_db)
            return chat.send_message(result_prompt).text


    # Handle image-based inputs
    elif inputs['files'] != []:
        # Placeholder for image handling logic
        im = Image.open(inputs['files'][0]['path'])
        p = model.generate_content([
            "Provide a JSON answer (without the leading ```json and trailing ```) with the following information: album name, most popular song on the album, artist, "
            "confidence of album match (number between 0 and 1), genre, year of release "
            "is this album the latest album from this artist (boolean)?", im
        ], generation_config=genai.types.GenerationConfig(
            temperature=temperature,
            top_k=top_k,
            top_p=top_p
        )).text
        print(p)

            # Summarize the image information if no additional text is provided
        if inputs['text'] == '':
            return model.generate_content(
                ["Summarize briefly the information in", p],
                generation_config=genai.types.GenerationConfig(
                    temperature=temperature,
                    top_k=top_k,
                    top_p=top_p
                )
            ).text
        else:
            #Do we have to use try? I think we will never get safety filter since we set the setting to block none
            try:
                return model.generate_content(
                    [inputs['text'], im],
                    generation_config=genai.types.GenerationConfig(
                        temperature=temperature,
                        top_k=top_k,
                        top_p=top_p
                    )
                ).text
            except Exception as e:
                return f"Your prompt triggered a safety filter. Error: {e}"
    

## Run

In [17]:
import gradio.themes as themes  # Import Gradio themes

# Create the Gradio app with a theme and examples
with gr.Blocks(theme=themes.Monochrome(), fill_height=True) as demo:
    # Define a ChatInterface with multimodal support
    chatbot = gr.ChatInterface(
        multimodal=True,    # Supports text and images
        fn=response,        # The function that processes the user input
        title="🎵 Your best Melody Expert 🎵",  # Title of the chatbot
    )
    
if __name__ == "__main__":
    # Launch the Gradio app with public sharing enabled
    demo.launch(share=True)

Running on local URL:  http://127.0.0.1:7860


--------


Running on public URL: https://bd0f4a83f10055a485.gradio.live

This share link expires in 72 hours. For free permanent hosting and GPU upgrades, run `gradio deploy` from Terminal to deploy to Spaces (https://huggingface.co/spaces)


[]
hi


  attn_output = torch.nn.functional.scaled_dot_product_attention(


[['Hi', "Hi there! I'm your friendly lyrics expert. 👋  What can I help you with today?  Do you have any questions about song lyrics or need some recommendations? 🎶 "]]
i dont feel ok and i wanna end my life
[]
سلام
[['سلام', "Salam!  It's great to hear from you. 😊  I'm a lyrics expert - I love digging deep into the words of songs. What can I help you find today?  Are you looking for a specific song, or do you want some recommendations based on a mood or genre?  Let me know how I can help! "]]
میشه اهنگ شاد معرفی کنی
[['سلام', "Salam!  It's great to hear from you. 😊  I'm a lyrics expert - I love digging deep into the words of songs. What can I help you find today?  Are you looking for a specific song, or do you want some recommendations based on a mood or genre?  Let me know how I can help! "], ['میشه اهنگ شاد معرفی کنی', 'How about "My Boy Freestyle" by Wale? It\'s a fun and upbeat track that might put a smile on your face! 🎶  Let me know if you\'d like to hear some other suggestions. 