<a href="https://colab.research.google.com/github/n1xd13/n1xd13/blob/main/Lyrics_Driven_Raga_Recommendation_System_.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
import sys
import random
import time
import datetime
import re

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt


import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence

from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import classification_report, f1_score, ConfusionMatrixDisplay

import nltk
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer


def seed_everything(seed=42):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True

seed_everything()
np.set_printoptions(precision=3)

In [None]:
def read_txt(filepath: str) -> pd.DataFrame:
    df = pd.read_csv(filepath, sep=';', header=None, names=['sentence', 'label'])
    df['label'] = df['label'].astype('category').cat.codes
    return df

train_df = read_txt(r'/content/train.txt')

valid_df = read_txt(r'/content/val dataset.txt')
test_df  = read_txt(r'/content/test dataset.txt')

label_names = ['anger', 'fear', 'joy', 'love', 'sadness', 'surprise']

class_weights = compute_class_weight(class_weight="balanced", classes=np.unique(train_df['label']),  y=train_df['label'])

In [None]:
import nltk
nltk.download('wordnet')


[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


True

In [None]:
import nltk
nltk.download('stopwords')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [None]:
def clean_lemmatize_tokenize(text: str) -> list[str]:

    REPLACE_BY_SPACE_RE = re.compile('[/(){}\[\]\|@,;]')    # add/remove regex as required
    BAD_SYMBOLS_RE = re.compile('[^0-9a-z #+_]')
    NUMBERS = re.compile('\d+')
    STOPWORDS = set(stopwords.words('english'))
    lemmatizer = WordNetLemmatizer()

    # clean
    text = text.lower()
    text = REPLACE_BY_SPACE_RE.sub(' ', text)
    text = BAD_SYMBOLS_RE.sub('', text)
    text = NUMBERS.sub('', text)

    # remove stopwords and lemmatize
    tokens = [word for word in text.split() if word not in STOPWORDS]
    tokens = [lemmatizer.lemmatize(token) for token in tokens]

    return tokens

train_df['tokenized'] = train_df['sentence'].apply(clean_lemmatize_tokenize)
valid_df['tokenized'] = valid_df['sentence'].apply(clean_lemmatize_tokenize)
test_df['tokenized']  =  test_df['sentence'].apply(clean_lemmatize_tokenize)

In [None]:
def make_vocabulary_from_tokens(
    tokenized_sentences: pd.Series,
    min_doc_freq: int = 1,
    max_doc_freq: int = 1_000_000
) -> dict[str: int]:

    # Count frequency of each token in dataset
    document_freq = {}
    for tokenized_sentence in tokenized_sentences:
        for token in tokenized_sentence:
            document_freq[token] = document_freq.get(token, 0) + 1

    # Discard tokens with freq < min_doc_freq
    qualified_tokens = {
        token: freq for token, freq in document_freq.items() if (min_doc_freq < freq < max_doc_freq)
    }

    # Add in token_ids for each token
    vocab = {token: token_id+2 for token_id, token in enumerate(qualified_tokens.keys())}

    # Add special tokens
    vocab['[PAD]'] = 0
    vocab['[UNK]'] = 1

    return vocab, qualified_tokens

vocab, doc_freq = make_vocabulary_from_tokens(train_df['tokenized'], 3)    # use only train set for this

# Use the built-in print function
print(f'len(vocab)={len(vocab)}')
print(f'len(doc_freq)={len(doc_freq)}')

len(vocab)=3814
len(doc_freq)=3812


In [None]:
def tokens_to_input_ids(tokenized_sentence: list[str], vocabulary=vocab) -> torch.tensor:
    input_ids = [
        vocabulary.get(token, 1) for token in tokenized_sentence
    ]
    return input_ids

X_train, y_train = train_df['tokenized'].apply(tokens_to_input_ids), train_df['label'].to_list()
X_valid, y_valid = valid_df['tokenized'].apply(tokens_to_input_ids), valid_df['label'].to_list()
X_test,  y_test  =  test_df['tokenized'].apply(tokens_to_input_ids),  test_df['label'].to_list()

In [None]:
# Hyper parameters etc.
class CFG:
    n_epochs = 20
    learning_rate = 1.0e-3
    batch_size = 64
    batches_per_epoch = len(X_train) // batch_size
    label_names = [name[:3] for name in label_names]   # first few letters only for plots etc


class LSTMClassifier(nn.Module):

    def __init__(self):
        # Model parameters
        vocab_size = len(vocab)
        n_labels = len(label_names)
        embedding_dim = 256
        hidden_dim = 128
        dropout_rate = 0.25
        lstm_dropout_rate = 0.2,
        num_lstm_layers = 1

        # Model
        super().__init__()
        self.n_layers = num_lstm_layers
        self.hidden_dim = hidden_dim
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(
            embedding_dim, hidden_dim, num_layers=num_lstm_layers,
            batch_first=True
        )
        self.dropout = nn.Dropout(dropout_rate)
        self.fc = nn.Linear(hidden_dim, n_labels)

    def forward(self, X_batch):
        embedding = self.dropout(self.embedding(X_batch))
        hidden, carry = (
            torch.randn(self.n_layers, len(X_batch), self.hidden_dim),
            torch.randn(self.n_layers, len(X_batch), self.hidden_dim),
        )
        output, (hidden, carry) = self.lstm(embedding, (hidden, carry))
        return self.fc(self.dropout(output[:,-1]))


model = LSTMClassifier()

# Loss function and optimizer
loss_fn = nn.CrossEntropyLoss(weight=torch.tensor(class_weights))   # adding class weights because unbalanced train set
optimizer = torch.optim.Adam(model.parameters(), lr=CFG.learning_rate)

# Training performance metric
def weighted_f1(y_true, y_pred):
    return f1_score(y_true.argmax(1), y_pred.argmax(1), average='weighted')

In [None]:
class TextDataset(Dataset):
    def __init__(self, input_ids: list[list[int]], labels: list[int]):
        '''
        - Stores tokenized sentences as tensors of input ids according to vocabulary mapping.
        - Labels are directly passed as integers.
        '''
        self.input_ids = pad_sequence([torch.tensor(sequence) for sequence in input_ids], batch_first=True)
        self.labels = torch.tensor(labels, dtype=torch.long)  # Assuming labels are already integers

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return [self.input_ids[idx], self.labels[idx]]

def collate_fn(batch):
    pass

In [None]:
def train_model(
    X_train, y_train,
    X_valid, y_valid,
    model, optimizer, metric, loss_fn, collate_fn, CFG,
):
    timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
    train_loss  = []; valid_loss  = []
    train_score = []; valid_score = []

    train_loader  = DataLoader(
        TextDataset(X_train, y_train),
        batch_size=CFG.batch_size, collate_fn=collate_fn, shuffle=True
    )
    valid_loader  = DataLoader(
        TextDataset(X_valid, y_valid),
        batch_size=CFG.batch_size, collate_fn=collate_fn, shuffle=False
    )

    best_vloss = 1_000_000

    for epoch in range(CFG.n_epochs):

        # Train step
        model.train(True)
        running_loss = 0
        running_score = 0

        for i, data in enumerate(train_loader):
            inputs, labels = data

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = loss_fn(outputs, labels)
            loss.backward()
            optimizer.step()

            # compute metrics and store
            score = metric(labels, outputs)
            running_score += score
            running_loss += loss.item()

        train_score.append(avg_score := float(score / (i + 1)))
        train_loss.append(avg_loss := float(loss / (i + 1)))
        # TODO: find way to make these numbers bigger for plotting

        # Validate
        model.eval()
        running_vloss = 0
        running_vscore = 0
        with torch.no_grad():
            for i, vdata in enumerate(valid_loader):
                vinputs, vlabels = vdata
                voutputs = model(vinputs)
                vloss = loss_fn(voutputs, vlabels)

                vscore = metric(vlabels, voutputs)
                running_vscore += vscore
                running_vloss += vloss

        valid_score.append(avg_vscore := float(running_vscore / (i + 1)))
        valid_loss.append(avg_vloss := float(running_vloss / (i + 1)))


        # Track best performance and save model's state
        if avg_vloss < best_vloss:
            best_vloss = avg_vloss
            model_path = 'model_{}_{}'.format(timestamp, epoch)
            torch.save(model.state_dict(), model_path)

        print(f'Epoch {epoch}: loss = {avg_loss} score = {avg_score} | vloss = {avg_vloss} vscore = {avg_vscore}')


    # Plot loss and metric
    fig,(ax1, ax2) = plt.subplots(1, 2)

    ax1.set_xlabel('epoch'); ax1.set_ylabel('loss'); ax1.set_title('training loss')
    ax1.plot(np.arange(len(train_loss)), train_loss, label='training')
    ax1.plot(np.arange(len(valid_loss)), valid_loss, label='validation')
    ax1.legend()

    ax2.set_xlabel('epoch'); ax2.set_ylabel('score'); ax2.set_title('training score')
    ax2.plot(np.arange(len(train_score)), train_score, label='training')
    ax2.plot(np.arange(len(valid_score)), valid_score, label='validation')
    ax2.legend()

In [None]:
train_model(
    X_train, y_train,
    X_valid, y_valid,
    model, optimizer, weighted_f1, loss_fn, None, CFG
)

RuntimeError: expected scalar type Float but found Double

In [None]:
def test_model(X, y, model):

    model.eval()

    test_loader = DataLoader(
        TextDataset(X, y),
        batch_size=CFG.batch_size, shuffle=False
    )

    y_preds = []
    with torch.no_grad():
        for i, data in enumerate(test_loader):
            inputs, labels = data
            outputs = model(inputs)
            y_preds.append(outputs.argmax(1))

    y_preds = torch.cat(y_preds)

    print(classification_report(y, y_preds, target_names=label_names))
    ConfusionMatrixDisplay.from_predictions(y, y_preds, display_labels=label_names)

    return y_preds

In [None]:
vpreds = test_model(X_valid, y_valid, model)

In [None]:
preds = test_model(X_test, y_test, model)

In [None]:
def predict_emotion(input_text, model, vocabulary, label_names):
    # Tokenize input text
    tokenized_input = clean_lemmatize_tokenize(input_text)
    input_ids = tokens_to_input_ids(tokenized_input, vocabulary)

    # Convert input_ids to tensor and add batch dimension
    input_tensor = torch.tensor(input_ids).unsqueeze(0)

    # Get model prediction
    model.eval()
    with torch.no_grad():
        output = model(input_tensor)

    # Get predicted emotion label
    predicted_label = output.argmax(1).item()
    predicted_emotion = label_names[predicted_label]

    return predicted_emotion

# Example usage
input_text = "I am feeling loved that i get to love again"
predicted_emotion = predict_emotion(input_text, model, vocab, CFG.label_names)
print(f"Predicted emotion: {predicted_emotion}")


In [None]:
pip install googletrans==4.0.0-rc1

In [None]:
from googletrans import Translator

# Function to translate text from Hindi to English
def translate_to_english(text):
    translator = Translator()
    translated_text = translator.translate(text, src='hi', dest='en')
    return translated_text.text

# Assuming predict_emotion function takes English text as input
input_text_hindi = "मुझे तुमसे प्यार है"

# Translate input text from Hindi to English
input_text_english = translate_to_english(input_text_hindi)

# Now feed the translated text into your model

predicted_emotion = predict_emotion(input_text_english, model, vocab, CFG.label_names)
print(f"Predicted emotion: {predicted_emotion}")


In [None]:
from googletrans import Translator

# Function to translate text from Hindi to English
def translate_to_english(text):
    translator = Translator()
    translated_text = translator.translate(text, src='hi', dest='en')
    return translated_text.text

# Function to take input from the user
def get_user_input():
    input_text = input("Enter your text in Hindi: ")
    return input_text

# Get input from the user
input_text_hindi = get_user_input()

# Translate input text from Hindi to English
input_text_english = translate_to_english(input_text_hindi)

# Predict emotion based on translated text
predicted_emotion = predict_emotion(input_text_english, model, vocab, CFG.label_names)
print(f"Predicted emotion: {predicted_emotion}")


In [None]:
from googletrans import Translator

# Dictionary mapping emotions to recommended ragas
raga_recommendations = {
    "anger": "Bhairavi",
    "fear": "Malkauns",
    "joy": "Bilawal",
    "love": "Yaman",
    "sadness": "Ahir Bhairav",
    "surprise": "Darbari Kanada"
}

# Updated mapping between model's predicted labels and expected emotion labels
emotion_mapping = {
    "ang": "anger",
    "fear": "fear",
    "joy": "joy",
    "lov": "love",
    "sad": "sadness",
    "sur": "surprise"
}

# Function to translate text from Hindi to English
def translate_to_english(text):
    translator = Translator()
    translated_text = translator.translate(text, src='hi', dest='en')
    return translated_text.text

# Function to take input from the user
def get_user_input():
    input_text = input("Enter your text: ")
    return input_text

# Get input from the user
user_input = get_user_input()

# Translate input text from Hindi to English
input_text_english = translate_to_english(user_input)

# Predict emotion based on translated text
predicted_emotion = predict_emotion(input_text_english, model, vocab, CFG.label_names)

# Convert predicted emotion to lowercase and map to expected labels
predicted_emotion_lower = predicted_emotion.lower()
predicted_emotion_mapped = emotion_mapping.get(predicted_emotion_lower, "Unknown")

# Get the recommended raga based on the mapped predicted emotion
recommended_raga = raga_recommendations.get(predicted_emotion_mapped, "Unknown")

# Display the predicted emotion and recommended raga
print(f"Predicted emotion: {predicted_emotion_mapped}")
print(f"Recommended raga based on your emotion: {recommended_raga}")


In [None]:
pip install streamlit

In [None]:
# prompt: !pip install --upgrade streamlit

!pip install --upgrade streamlit


In [None]:
!pip install pyngrok

In [None]:
import streamlit as st
from googletrans import Translator

# Dictionary mapping emotions to recommended ragas
raga_recommendations = {
    "anger": "Bhairavi",
    "fear": "Malkauns",
    "joy": "Bilawal",
    "love": "Yaman",
    "sadness": "Ahir Bhairav",
    "surprise": "Darbari Kanada"
}

# Updated mapping between model's predicted labels and expected emotion labels
emotion_mapping = {
    "ang": "anger",
    "fear": "fear",
    "joy": "joy",
    "lov": "love",
    "sad": "sadness",
    "sur": "surprise"
}

# Function to translate text from Hindi to English
def translate_to_english(text):
    translator = Translator()
    translated_text = translator.translate(text, src='hi', dest='en')
    return translated_text.text

# Function to predict emotion (placeholder for demonstration)
def predict_emotion(text):
  predicted_emotion = predict_emotion(text, model, vocab, CFG.label_names)

# Convert predicted emotion to lowercase and map to expected labels
  predicted_emotion_lower = predicted_emotion.lower()
  predicted_emotion_mapped = emotion_mapping.get(predicted_emotion_lower, "Unknown")

# Get the recommended raga based on the mapped predicted emotion
  recommended_raga = raga_recommendations.get(predicted_emotion_mapped, "Unknown")

# Display the predicted emotion and recommended raga
  print(f"Predicted emotion: {predicted_emotion_mapped}")
  print(f"Recommended raga based on your emotion: {recommended_raga}")



# Streamlit app
def main():
    st.title("Emotion Raga Recommender")

    # Get input text from the user
    input_text = st.text_area("Enter your text in Hindi:")

    # Check if input text is empty
    if not input_text:
        st.write("Please enter some text in Hindi.")
        return

    # Translate input text from Hindi to English
    input_text_english = translate_to_english(input_text)

    # ...

    if st.button("Predict Emotion"):
        # Predict emotion based on translated text

        predicted_emotion = predict_emotion(input_text_english)

        # Convert predicted emotion to lowercase and map to expected labels
        predicted_emotion_mapped = emotion_mapping.get(predicted_emotion.lower(), "Unknown")

        # Get the recommended raga based on the mapped predicted emotion
        recommended_raga = raga_recommendations.get(predicted_emotion_mapped, "Unknown")

        # Display the predicted emotion and recommended raga
        st.write(f"Predicted emotion: {predicted_emotion_mapped}")
        st.write(f"Recommended raga based on your emotion: {recommended_raga}")

if __name__ == "__main__":
    main()


In [None]:
from pyngrok import ngrok

In [None]:
!wget -q -O - ipv4.icanhazip.com

In [None]:
! streamlit run keshav.py & npx localtunnel --port 8501

/bin/bash: line 1: streamlit: command not found
[K[?25h^C
