In [2]:
import os

In [3]:
import shutil

In [4]:
def process_and_copy_files(source_dir, target_dir):
     
    #Processes and copies text files from the source directory to the target directory.

    #This function performs the following steps:
   # 1. Checks if the target directory exists; if not, it creates it.
   # 2. Iterates through all files in the source directory and its subdirectories.
   # 3. Identifies files with a ".txt" extension.
   # 4. Reads the content of each text file, removes the first line (assumed to be metadata),
   #    and writes the remaining content to a new file in the target directory.

   # Args:
   #     source_dir (str): The path to the source directory containing the text files.
   #     target_dir (str): The path to the target directory where processed files will be saved.

   # Returns:
   #     None
    
    if not os.path.exists(target_dir):
        os.makedirs(target_dir)
    
    for root, _, files in os.walk(source_dir):
        for file in files:
            if file.endswith(".txt"):
                source_path_raw = os.path.join(root, file)
                target_path_raw = os.path.join(target_dir, file)
                with open(source_path_raw, "r", encoding="utf-8") as source_file:
                    lines = source_file.readlines()
                # Remove the first line (metadata) and save the rest
                processed_lines = lines[1:]
                with open(target_path_raw, "w", encoding="utf-8") as target_file:
                    target_file.writelines(processed_lines)

In [5]:
import random

In [6]:
def extract_words_from_files(target_dir, num_files=100, num_words=200):
    
   # Extracts a limited number of words from a random selection of text files in the target directory.

   # This function performs the following steps:
  #  1. Collects all text files in the specified target directory.
  #  2. Randomly selects a specified number of files (`num_files`) or fewer if there are not enough files.
  #  3. For each selected file:
  #      - Reads the content of the file.
  #      - Skips the file if it is empty.
  #      - Extracts the first `num_words` words from the file's content.
  #      - Stores the extracted words along with the file's title (derived from the filename) in a list.
  #  4. Returns the list of tuples, where each tuple contains the file title and the extracted text.

   # Args:
   #     target_dir (str): The path to the directory containing the text files.
   #     num_files (int, optional): The maximum number of files to process. Defaults to 100.
   #     num_words (int, optional): The maximum number of words to extract from each file. Defaults to 200.

   # Returns:
   #     list: A list of tuples, where each tuple contains:
    #          - The title of the file (str).
    #          - The extracted text (str) limited to `num_words` words.
    
    all_files = [os.path.join(target_dir, file) for file in os.listdir(target_dir) if file.endswith(".txt")]
    selected_files = random.sample(all_files, min(num_files, len(all_files)))
    extracted_data = []   
    for file_path in selected_files:
        song_title = os.path.splitext(os.path.basename(file_path))[0]
        with open(file_path, "r", encoding="utf-8") as file:
            content = file.read()
            if not content.strip():  # Skip empty files
                continue
            words = content.split()
            extracted_text = " ".join(words[:num_words])  # Limit to the first `num_words`
            extracted_data.append((song_title, extracted_text))
    
    return extracted_data

In [7]:
from sentence_transformers import SentenceTransformer

In [8]:
model = SentenceTransformer("all-mpnet-base-v2")

In [9]:
def generate_vectors(text_data, model):
    #Generates vector representations for a collection of song texts using a given model.

   # This function performs the following steps:
   # 1. Extracts song texts and their corresponding titles from the input data.
   # 2. Uses the provided model to encode the song texts into vector representations.
   # 3. Combines the song titles with their corresponding vectors into a list of tuples.
   # 4. Returns the list of tuples, where each tuple contains a song title and its vector.

    #Args:
   #     text_data (list): A list of tuples, where each tuple contains:
    #                      - The title of the song (str).
     #                     - The text of the song (str).
   #     model (object): A model with an `encode` method that generates vector representations
    #                    for a list of texts. The `encode` method should support the following parameters:
    #                    - `batch_size` (int): The number of texts to process in a single batch.
     #                   - `show_progress_bar` (bool): Whether to display a progress bar during encoding.

    #Returns:
    #    list: A list of tuples, where each tuple contains:
    #          - The title of the song (str).
    #          - The vector representation of the song text (numpy array or similar format).
    # 
    song_texts = [text for _, text in text_data]
    song_titles = [title for title, _ in text_data]
    
    # Generate vectors for the song texts
    vectors = model.encode(song_texts, batch_size=16, show_progress_bar=True)
    
    # Combine song titles with their corresponding vectors
    song_vectors = [(title, vector) for title, vector in zip(song_titles, vectors)]
    
    return song_vectors

In [10]:
import faiss

In [11]:
import numpy as np

In [12]:
def create_faiss_index_with_titles(song_vectors):
    
   
    titles = [item[0] for item in song_vectors]
    vectors = np.array([item[1] for item in song_vectors]).astype("float32")  # Ensure correct dtype

    # Normalize the vectors
    vectors = vectors / np.linalg.norm(vectors, axis=1, keepdims=True)

    # Create the FAISS index
    dimension = vectors.shape[1]  # Vector dimension
    index = faiss.IndexFlatL2(dimension)  # L2 distance index
    index.add(vectors)  # Add normalized vectors to the index

    return {"index": index, "titles": titles}

In [13]:
def search_songs_by_word_or_phrase(query, extracted_data, faiss_index, model, top_k=10, similarity_threshold=0.9):
    
   # Search for the top K most similar songs to the query using FAISS.

  #  Args:
   #     query (str): The word or phrase to search for.
   #     extracted_data (list): The original dataset of song titles and texts (not used here but kept for reference).
    #    faiss_index (dict): A dictionary containing the FAISS index and song titles.
   #     model (SentenceTransformer): The model used to encode the query into a vector.
    #    top_k (int): The number of top results to return.
    #    similarity_threshold (float): The minimum similarity score to consider a result valid.

   # Returns:
   #     list: A list of tuples containing the song title and similarity score for valid matches.
   
    # Generate a vector for the query
    query_vector = model.encode([query], show_progress_bar=False)[0]

    # Normalize the query vector
    query_vector = query_vector / np.linalg.norm(query_vector)

    # Search for the top K most similar songs in the FAISS index
    distances, indices = faiss_index["index"].search(np.array([query_vector]).astype("float32"), top_k)

    # Map the FAISS indices back to the song titles and filter by similarity threshold
    results = []
    for i in range(len(indices[0])):
        faiss_title = faiss_index["titles"][indices[0][i]]
        similarity = 1 - distances[0][i]  # Convert L2 distance to similarity (1 - distance)

        if similarity >= similarity_threshold:  # Only include results above the threshold
            results.append((faiss_title, similarity))
    
    return results

In [14]:
source_dir = "../raw_data/Taylor-Swift-Lyrics/data/Albums"

In [15]:
target_dir = "../data"

In [16]:
process_and_copy_files(source_dir, target_dir)

In [17]:
extracted_data = extract_words_from_files(target_dir)


In [18]:
song_vectors = generate_vectors(extracted_data, model)


Batches:   0%|          | 0/7 [00:00<?, ?it/s]

In [19]:
faiss_index = create_faiss_index_with_titles(song_vectors)

In [20]:
query = "Oh, oh"

In [21]:
similarity_threshold = 4

In [22]:
top_k = 10

In [23]:
results = search_songs_by_word_or_phrase(query, extracted_data, faiss_index, model, top_k, similarity_threshold)


In [24]:
if results:
    print(f"Top {top_k} songs containing the word/phrase '{query}':")
    for title, similarity in results:
        print(f"Title: {title}, Similarity: {similarity:.4f}")
else:
    print(f"No songs found containing the word/phrase: '{query}'")

No songs found containing the word/phrase: 'Oh, oh'


In [301]:
# Check the number of vectors in the FAISS index
print(f"Number of vectors in FAISS index: {faiss_index['index'].ntotal}")

# Check the first few vectors in the FAISS index
print("First few vectors in FAISS index:")
print(faiss_index["index"].reconstruct_n(0, min(5, faiss_index["index"].ntotal)))


Number of vectors in FAISS index: 100
First few vectors in FAISS index:
[[-0.05917441 -0.03241466  0.04516074 ... -0.04496421 -0.01286864
  -0.04903262]
 [-0.03611477 -0.05216329  0.06939048 ... -0.01350493 -0.00953179
  -0.09115995]
 [ 0.00309049 -0.03903301  0.12974522 ...  0.0284683  -0.08128463
  -0.12803575]
 [-0.07103611 -0.03646065  0.07148556 ...  0.02152741 -0.08043654
  -0.09750905]
 [-0.02360871  0.00433577  0.08493806 ... -0.02512681 -0.06737459
  -0.09258062]]


In [302]:
# Generate and normalize the query vector
query = "example query text"  # Replace with your actual query
query_vector = model.encode([query], show_progress_bar=False)[0]
query_vector = query_vector / np.linalg.norm(query_vector)

# Print the query vector
print("Query vector:")
print(query_vector)

# Check if the query vector is valid
print("Query vector norm (should be 1.0):", np.linalg.norm(query_vector))


Query vector:
[ 4.82537672e-02  9.93916541e-02 -3.55676226e-02  8.95517617e-02
 -9.60402414e-02  5.57974875e-02  1.40093565e-01  5.98580576e-02
 -2.33554896e-02 -2.70822775e-02  6.63445294e-02 -3.07279117e-02
  8.83587599e-02 -9.00981724e-02  1.27834370e-02  5.06016612e-02
  4.08293717e-02  8.46594572e-03 -5.18636443e-02 -3.23123559e-02
  6.49143010e-02  1.18162930e-01 -1.20536806e-02  4.28744145e-02
 -1.05945803e-02  3.47838774e-02 -2.40301285e-02  2.77031772e-02
  7.05039427e-02  1.78749692e-02 -9.39764827e-02  3.10457163e-02
  5.64716905e-02  1.28426880e-01  4.35699560e-02 -4.35039252e-02
 -3.21654417e-02 -2.06050277e-03 -2.66786013e-02 -3.62970494e-02
 -1.22207971e-02 -7.05498978e-02  2.39946209e-02 -4.86965803e-03
  2.48031989e-02 -5.21616340e-02 -9.84121114e-02  2.39161085e-02
  4.46803756e-02 -3.05246133e-02 -1.09827951e-01 -2.39416361e-02
 -5.17065600e-02 -1.04688443e-02  2.87273340e-02  4.65779491e-02
 -1.09065123e-01  3.19293089e-04 -2.41400711e-02 -3.14146616e-02
  5.8078172

In [303]:
# Perform a search in the FAISS index
distances, indices = faiss_index["index"].search(np.array([query_vector]).astype("float32"), 5)

# Print the raw distances and indices
print("Distances from FAISS search:")
print(distances)
print("Indices from FAISS search:")
print(indices)

Distances from FAISS search:
[[1.7476499 1.7500359 1.7580174 1.7828362 1.7857184]]
Indices from FAISS search:
[[88 18 33 54 89]]


In [1]:
pip freeze>requirements.txt

Note: you may need to restart the kernel to use updated packages.
