In [1]:
from langchain_community.document_loaders import TextLoader, DirectoryLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_chroma import Chroma

from dotenv import load_dotenv, find_dotenv
import pandas as pd
import tiktoken
import re
import os

In [2]:
# Find and load the .env file
load_dotenv(find_dotenv())

# Define the embedding model to use
EMBEDDING_MODEL_NAME = "text-embedding-3-small"
embeddings = OpenAIEmbeddings(model=EMBEDDING_MODEL_NAME)

In [3]:
def load_lyrics_from_folder(folder_path):
    """
    Load lyrics from a root folder of text files.

    Parameters:
    ----------
    folder_path : str
        The path to the root folder containing the text files.

    Returns:
    -------
    list of Document
        A list of documents, each containing the text from one of the text files.
    """

    text_loader_kwargs = {"encoding": "utf8"}
    loader = DirectoryLoader(
        path=folder_path,
        glob="**/*.txt",
        loader_cls=TextLoader,
        loader_kwargs=text_loader_kwargs,
    )
    docs = loader.load()

    for doc in docs:
        source_metadata = doc.metadata["source"]

        title = os.path.basename(source_metadata).replace(".txt", "")
        file_folder = os.path.dirname(source_metadata)
        artist = os.path.basename(file_folder)

        doc.metadata["artist"] = artist
        doc.metadata["title"] = title

    return docs

In [None]:
### Load the lyrics from the root folder

# Define the root folder containing the lyrics
lyrics_path = "lyrics"

# Load the lyrics from the root folder
docs_lyrics = load_lyrics_from_folder(lyrics_path)
print(f"Number of documents with lyrics {len(docs_lyrics)}")

In [5]:
### Remove duplicates based on page_content column and keep the first one based on title and artist length (shortest)

# Create a DataFrame from the documents
df_docs = pd.DataFrame(
    [
        {
            "artist": doc.metadata["artist"],
            "title": doc.metadata["title"],
            "page_content": doc.page_content,
            "source": doc.metadata["source"],
        }
        for doc in docs_lyrics
    ]
)

# Find all duplicates based on page_content column
duplicates = df_docs[df_docs.duplicated("page_content", keep=False)].copy()

# Add title and artist length columns
duplicates["title_len"] = duplicates["title"].apply(len)
duplicates["artist_len"] = duplicates["artist"].apply(len)

# Sort by title length and artist length in ascending order
duplicates = duplicates.groupby("page_content").apply(
    lambda x: x.sort_values(["title_len", "artist_len"], ascending=[True, True]),
    include_groups=False,
)

# Reset index and mark all duplicates except the first one
duplicates = duplicates.reset_index()
duplicates = duplicates[duplicates.duplicated("page_content", keep="first")]

# Remove the marked duplicates from the file system
for index, row in duplicates.iterrows():
    source = row["source"]
    os.remove(source)
    print(f"Removed {source}")

In [6]:
### Delete non-english documents with non-ascii characters in the page_content column

# Define the pattern to match non-ascii characters
pattern = r"[^\u0000-\u007F]"

# Find all documents with non-ascii characters in the page_content column and remove them
for doc in df_docs.itertuples():
    if re.search(pattern, doc.page_content):
        os.remove(doc.source)
        print(f"Removed {doc.source}")

In [None]:
### Split the documents into smaller chunks

# Split the documents into smaller chunks of text with a maximum size of 300 characters and an overlap of 150 characters
# The documents are split based on the separators "\n\n" and "\n"
# So firstly we split lyrics in respect to paragraphs and then split each paragraph in respect to lines
text_splitter = RecursiveCharacterTextSplitter(
    separators=["\n\n", "\n"], chunk_size=300, chunk_overlap=150
)
documents = text_splitter.split_documents(docs_lyrics)
print(f"Number of documents after splitting {len(documents)}")

In [None]:
### Find duplicates in the split documents based on source and page_content columns and remove them from the documents list

# Create a DataFrame from the documents
df_docs = pd.DataFrame(
    [
        {
            "artist": doc.metadata["artist"],
            "title": doc.metadata["title"],
            "page_content": doc.page_content,
            "source": doc.metadata["source"],
        }
        for doc in documents
    ]
)

# Find all duplicates based on source and page_content columns leaving the first one
duplicates = df_docs[df_docs.duplicated(["source", "page_content"])]
print(
    f"Number of documents before removing duplicates {len(documents)} and number of duplicates {len(duplicates)}"
)

# Remove the duplicates from the documents list by setting the duplicate elements to None
for duplicate in duplicates.itertuples():
    documents[duplicate.Index] = None

# Remove the None elements from the documents list
documents = [element for element in documents if element is not None]
print(f"Number of documents after removing duplicates {len(documents)}")

In [None]:
### Calculate the total number of tokens to estimate the cost

# Load the OpenAI embeddings model in tiktoken
encoding = tiktoken.encoding_for_model(EMBEDDING_MODEL_NAME)

# Calculate the total number of tokens in the documents
tokens = 0
for doc in documents:
    tokens += len(encoding.encode(doc.page_content))
print(f"Total number of tokens: {tokens}")

In [10]:
# ### Save the documents to the Chroma database

# # Save the documents to the Chroma database
# # The embeddings are calculated using the OpenAI embeddings model
# # The database is saved in the chroma_db folder with the collection name "lyrics"
# Chroma.from_documents(
#     documents=documents,
#     embedding=embeddings,
#     persist_directory="./chroma_db",
#     collection_name="lyrics",
# )