# A Vector Database of Classic English Literature

In the [previous notebook](https://github.com/tommyliphysics/tommyli-ml/blob/main/literature_vdb/notebooks/create.ipynb) we created a vector database of English texts downloaded from [Project Gutenberg](https://www.gutenberg.org). We will now look at accessing this database to perform a vector search and add new texts. We will do this by creating a class called LiteratureSearch with the functions add() (for adding a book to the database) and search() (for querying the database).

In [1]:
!pip install pyspark sentence_transformers faiss-cpu



In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit

from sentence_transformers import SentenceTransformer
import numpy as np
import pandas as pd
import faiss
import time

import gc

import urllib.request
import regex

def read_book(url):
    try:
        response = urllib.request.urlopen(url)
    except Exception as e:
        print(e)
    else:
        if response is None:
            print("Error: no response.")
            return None, None

        content = response.read().decode("utf-8")
        content = regex.sub(r'\r\n', '\n', content)
        if len(regex.findall(r'\r', content)) > 0:
            print("Warning: found stray carriage return.")
        book_title = regex.findall("Title: (.*?)[\n]", content)
        author = regex.findall("Author: (.*?)[\n]", content)

        if len(book_title) == 0:
            print("Could not find name of book.")
            book_title = None
        else:
            book_title = book_title[0]
            print("Book title: ", book_title)

        if len(author) == 0:
            print("Could not find name of author.")
            author = None
        else:
            author = author[0]
            print("author: ", author)
        if len(regex.findall(r'[\*]+[\s]+START OF THE PROJECT GUTENBERG EBOOK', content)) > 0:
            content = regex.split(r'[\*]+[\s]+START OF THE PROJECT GUTENBERG EBOOK[^\*]+[\*]+', content)[1]
            if len(regex.findall(r'[\*]+[\s]+END OF THE PROJECT GUTENBERG EBOOK', content)) > 0:
                content = regex.split(r'[\*]+[\s]+END OF THE PROJECT GUTENBERG EBOOK[^\*]+[\*]+', content)[0]

        blocks = regex.split(r'[\n]{2,}', content)

        samples = []
        for block in blocks:
            block = regex.sub(r'[\n]+', ' ', block)
            block = regex.sub(r'[\s]+', ' ', block)
            if len(block) > 0:
                samples.append({'author': author, 'title': book_title, 'text': block})
        response.close()

        return samples, book_title, author

class LiteratureSearch:
    def __init__(self, books_fn, embeddings_fn, index_fn):
      # load the Spark DataFrame
        self.spark = SparkSession.builder.appName("Read").getOrCreate()
        self.df = self.spark.read.csv(books_fn, header=True, inferSchema=True)

      # add time stamp for creation
        if 'added_date' not in self.df.columns:
            current_time = time.time()
            self.df = self.df.withColumn('added_date', lit(current_time))

        self.df.show()

      # get list of authors and book titles
        authors = self.df.select('author').distinct().collect()
        self.author_list = []

        for author in authors:
            filtered = self.df.filter(self.df.author == author['author']).select('title').distinct().collect()
            for row in filtered:
                self.author_list.append({'author': author['author'], 'title': row['title']})

        print("Found titles:")
        for row in self.author_list:
            print(f"{row['title']}\tby {row['author']}")
        self.author_list = pd.DataFrame(self.author_list)

      # load the sentence transformer model
        self.sentence_transformer_model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')
      # load embeddings
        self.embeddings = np.load(embeddings_fn)
      # load search index
        self.index_L2 = faiss.read_index(index_fn)

        self.max_id = self.df.count()

    def add(self, url):
        new_samples, title, author = read_book(url)
        if author in self.author_list['author'].unique():
            if title in self.author_list[self.author_list['author']==author]['title'].unique():
                print(f"Error: {title} by {author} already exists in the collection.")
                return -1

        current_time = time.time()
        for sample in new_samples:
            sample['id'] = self.max_id
            self.max_id += 1
            sample['added_date'] = current_time

        new_samples_df = self.spark.createDataFrame(new_samples)
        self.df = self.df.union(new_samples_df)
        print(f"Added {new_samples_df.count()} new samples to the pyspark dataframe.")

        new_texts = [sample['text'] for sample in new_samples]
        new_embeddings = self.sentence_transformer_model.encode(new_texts)
        self.embeddings = np.concatenate((self.embeddings, new_embeddings))
        print(f"Added new embeddings (new shape: {self.embeddings.shape}).")
        self.index_L2.add(new_embeddings)
        print(f"Added to index (new size: {self.index_L2.ntotal}).")

    def save(self, books_fn, embeddings_fn, index_fn):
        self.df.repartition(1).write.mode('overwrite').csv(books_fn, header=True)
        np.save('embeddings.npy', self.embeddings)
        faiss.write_index(self.index_L2, "index_L2.index")

    def search(self, query_text, k):
        query_vector = self.sentence_transformer_model.encode(query_text)
        distances, sorted_ids = self.index_L2.search(np.array([query_vector]), k)

        sorted_ids = sorted_ids[0].tolist()
        results = self.df.filter(self.df.id.isin(sorted_ids)).toPandas()
        results['result'] = results['id'].apply(lambda x: sorted_ids.index(x)+1)
        return results.sort_values(by='result').to_dict(orient='records')

    def close(self):
        del(self.index_L2)
        del(self.embeddings)
        self.spark.stop()
        print("Stopped Spark session.")
        gc.collect()

  from tqdm.autonotebook import tqdm, trange


Let's create an instance of LiteratureSearch, which will load the text as a Spark DataFrame, embeddings and search index.

In [3]:
litsearch = LiteratureSearch('books.csv', 'embeddings.npy', 'index_L2.index')

  self.pid = _posixsubprocess.fork_exec(


+---------------+---+--------------------+--------------------+-------------------+
|         author| id|                text|               title|         added_date|
+---------------+---+--------------------+--------------------+-------------------+
|Charles Dickens|  0|A TALE OF TWO CITIES|A Tale of Two Cities|1.724167494169181E9|
|Charles Dickens|  1|A STORY OF THE FR...|A Tale of Two Cities|1.724167494169181E9|
|Charles Dickens|  2|  By Charles Dickens|A Tale of Two Cities|1.724167494169181E9|
|Charles Dickens|  3|            CONTENTS|A Tale of Two Cities|1.724167494169181E9|
|Charles Dickens|  4|Book the First--R...|A Tale of Two Cities|1.724167494169181E9|
|Charles Dickens|  5|CHAPTER I The Per...|A Tale of Two Cities|1.724167494169181E9|
|Charles Dickens|  6|Book the Second--...|A Tale of Two Cities|1.724167494169181E9|
|Charles Dickens|  7|CHAPTER I Five Ye...|A Tale of Two Cities|1.724167494169181E9|
|Charles Dickens|  8|Book the Third--t...|A Tale of Two Cities|1.72416749416

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


Let's add a few more books to the database:

In [4]:
litsearch.add('https://www.gutenberg.org/cache/epub/730/pg730.txt')

Book title:  Oliver Twist
author:  Charles Dickens
Error: Oliver Twist by Charles Dickens already exists in the collection.


-1

In [5]:
litsearch.add('https://www.gutenberg.org/cache/epub/766/pg766.txt')

Book title:  David Copperfield
author:  Charles Dickens
Error: David Copperfield by Charles Dickens already exists in the collection.


-1

We can now save the database to file.

In [6]:
litsearch.save('books.csv', 'embeddings.npy', 'index_L2.index')

Let's now load a new instance of LiteratureSearch and see if our new books are in the database.

In [7]:
litsearch.close()

Stopped Spark session.


In [8]:
litsearch_new = LiteratureSearch('books.csv', 'embeddings.npy', 'index_L2.index')

+---------------+---+--------------------+--------------------+-------------------+
|         author| id|                text|               title|         added_date|
+---------------+---+--------------------+--------------------+-------------------+
|Charles Dickens|  0|A TALE OF TWO CITIES|A Tale of Two Cities|1.724167494169181E9|
|Charles Dickens|  1|A STORY OF THE FR...|A Tale of Two Cities|1.724167494169181E9|
|Charles Dickens|  2|  By Charles Dickens|A Tale of Two Cities|1.724167494169181E9|
|Charles Dickens|  3|            CONTENTS|A Tale of Two Cities|1.724167494169181E9|
|Charles Dickens|  4|Book the First--R...|A Tale of Two Cities|1.724167494169181E9|
|Charles Dickens|  5|CHAPTER I The Per...|A Tale of Two Cities|1.724167494169181E9|
|Charles Dickens|  6|Book the Second--...|A Tale of Two Cities|1.724167494169181E9|
|Charles Dickens|  7|CHAPTER I Five Ye...|A Tale of Two Cities|1.724167494169181E9|
|Charles Dickens|  8|Book the Third--t...|A Tale of Two Cities|1.72416749416

We see that we have successfully updated the database. Let's now perform a vector search.

In [9]:
litsearch_new.search("Let him cook!", 10)

[{'author': 'Charles Dickens',
  'id': 12577,
  'text': '‘Can you cook this young gentleman’s breakfast for him, if you please?’ said the Master at Salem House.',
  'title': 'David Copperfield',
  'added_date': 1724167494.169181,
  'result': 1},
 {'author': 'Charles Dickens',
  'id': 4338,
  'text': 'Must they! Let them not hope to taste it!',
  'title': 'Great Expectations',
  'added_date': 1724167494.169181,
  'result': 2},
 {'author': 'Charles Dickens',
  'id': 12526,
  'text': '‘What have we got here?’ he said, putting a fork into my dish. ‘Not chops?’',
  'title': 'David Copperfield',
  'added_date': 1724167494.169181,
  'result': 3},
 {'author': 'Charles Dickens',
  'id': 15213,
  'text': 'What with the novelty of this cookery, the excellence of it, the bustle of it, the frequent starting up to look after it, the frequent sitting down to dispose of it as the crisp slices came off the gridiron hot and hot, the being so busy, so flushed with the fire, so amused, and in the midst of

Once we are done with the database, we need to close it.  

In [10]:
litsearch_new.close()

Stopped Spark session.


In this series of notebooks we've seen how to create a vector database using a combination of PySpark, a sentence transformer and FAISS. Another option is to use existing vector search capabilities in cloud database services, e.g. AWS OpenSearch or MongoDB Atlas. The procedure there is similar to the one described here, but with a few differences: rather than storing the text and embeddings in separate files, one can store the embeddings as well as the text as attributes within the same collection. A search index can then be created using cloud services, and a vector search can be performed via an API.