<a href="https://colab.research.google.com/github/osaeed-ds/FieldLearnings/blob/main/FieldLearning_ModifyModule_Langchain2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Field Learning - Langchain - Modify Module**





## **Introduction**

In this exercise we are going to utilize modify a Langchain module.

In particular we are going to modify the Cassandra class.  

Currently Cassandra.from_documents does not have support for supplying the IDs for the rows, so row ids are auto generated UUIDs.  You may have a scenario where you have a meaningful primary key and you want to use that as the key in the underlying Cassandra table.  The downstream meethod, Cassandra.add_texts, does support supplying an optional list of IDs.  In this exercise, we modify Cassandra.from_documents to accept an optional list of ids.


## **Prerequisites Setup**


* Follow [these steps](https://docs.datastax.com/en/astra-serverless/docs/vector-search/overview.html) to create a new vector search enabled database in Astra.
* Generate a new ["Database Administrator" token](https://docs.datastax.com/en/astra-serverless/docs/manage/org/manage-tokens.html)
* Download the secure connect bundle for the database you just created (you can do this from the "Connect" tab of your database.
* You will also need the necessary secret for the LLM provider of your choice:
 - If Open AI, then you will need an [Open AI API Key](https://help.openai.com/en/articles/4936850-where-do-i-find-my-secret-api-key). This will require an Open AI account with billing enabled
 - If Vertex AI, you will need a config file
 - For more details, see [Pre-requisites](https://cassio.org/start_here/#llm-access) on cassio.org.


In [1]:
# install required dependencies
! pip install datasets google-cloud-aiplatform openai pandas tiktoken langchain cassio



You may be asked to "Restart the Runtime" at this time, as some dependencies
have been upgraded. **Please do restart the runtime now** for a smoother execution from this point onward.

## **Astra DB Setup**
The following steps will ask for the keyspace of the vector search enabled Astra DB that you want to use for this example, as well as the Astra DB Token that you generated as part of the prerequisites. You will also require to upload the [Secure Connect Bundle](https://awesome-astra.github.io/docs/pages/astra/download-scb/#c-procedure).
Lastly, we are going to create helper functions for a secure connection to Astra DB `getCQLSession` `getCQLKeyspace` and `getTableCount`

In [2]:
# Input your database keyspace name:
ASTRA_DB_KEYSPACE = input('Your Astra DB Keyspace name: ')

Your Astra DB Keyspace name: vector_preview


In [3]:
from getpass import getpass
# Input your Astra DB token string, the one starting with "AstraCS:..."
ASTRA_DB_TOKEN_BASED_PASSWORD = getpass('Your Astra DB Token: ')

Your Astra DB Token: ··········


In [4]:
# Upload your Secure Connect Bundle zipfile:
import os
from google.colab import files


print('Please upload your Secure Connect Bundle')
uploaded = files.upload()
if uploaded:
    astraBundleFileTitle = list(uploaded.keys())[0]
    ASTRA_DB_SECURE_BUNDLE_PATH = os.path.join(os.getcwd(), astraBundleFileTitle)
else:
    raise ValueError(
        'Cannot proceed without Secure Connect Bundle. Please re-run the cell.'
    )

Please upload your Secure Connect Bundle


Saving secure-connect-osaeed-vector.zip to secure-connect-osaeed-vector.zip


In [5]:
# colab-specific override of helper functions
from cassandra.cluster import (
    Cluster,
)
from cassandra.auth import PlainTextAuthProvider

# The "username" is the literal string 'token' for this connection mode:
ASTRA_DB_TOKEN_BASED_USERNAME = 'token'


def getCQLSession(mode='astra_db'):
    if mode == 'astra_db':
        cluster = Cluster(
            cloud={
                "secure_connect_bundle": ASTRA_DB_SECURE_BUNDLE_PATH,
            },
            auth_provider=PlainTextAuthProvider(
                ASTRA_DB_TOKEN_BASED_USERNAME,
                ASTRA_DB_TOKEN_BASED_PASSWORD,
            ),
        )
        astraSession = cluster.connect()
        return astraSession
    else:
        raise ValueError('Unsupported CQL Session mode')

def getCQLKeyspace(mode='astra_db'):
    if mode == 'astra_db':
        return ASTRA_DB_KEYSPACE
    else:
        raise ValueError('Unsupported CQL Session mode')

def getTableCount():
  # create a query that counts the number of records of the Astra DB table
  query = SimpleStatement(f"""SELECT COUNT(*) FROM {keyspace}.{table_name};""")

  # execute the query
  results = session.execute(query)
  return results.one().count


## **LLM Provider**

In the cell below you can choose between **GCP VertexAI** or **OpenAI** for your LLM services.
(See [Pre-requisites](https://cassio.org/start_here/#llm-access) on cassio.org for more details).

Make sure you set the `llmProvider` variable and supply the corresponding access secrets in the following cell.

In [6]:
# Set your secret(s) for LLM access:
llmProvider = 'OpenAI'  # 'GCP_VertexAI'


In [7]:
if llmProvider == 'OpenAI':
    apiSecret = getpass(f'Your secret for LLM provider "{llmProvider}": ')
    os.environ['OPENAI_API_KEY'] = apiSecret
elif llmProvider == 'GCP_VertexAI':
    # we need a json file
    print(f'Please upload your Service Account JSON for the LLM provider "{llmProvider}":')
    from google.colab import files
    uploaded = files.upload()
    if uploaded:
        vertexAIJsonFileTitle = list(uploaded.keys())[0]
        os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = os.path.join(os.getcwd(), vertexAIJsonFileTitle)
    else:
        raise ValueError(
            'No file uploaded. Please re-run the cell.'
        )
else:
    raise ValueError('Unknown/unsupported LLM Provider')

Your secret for LLM provider "OpenAI": ··········


## **Dataset Setup**
We will use the US Constitution as our dataset

In [8]:
!curl https://www.govinfo.gov/content/pkg/CDOC-110hdoc50/html/CDOC-110hdoc50.htm > constitution.txt

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  291k    0  291k    0     0   285k      0 --:--:--  0:00:01 --:--:--  285k


In [9]:
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.indexes import VectorstoreIndexCreator
from langchain.text_splitter import (
    CharacterTextSplitter,
    RecursiveCharacterTextSplitter,
)
from langchain.docstore.document import Document
from langchain.document_loaders import TextLoader
from langchain.indexes.vectorstore import VectorStoreIndexWrapper

loader = TextLoader("constitution.txt")

documents = loader.load()
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
docs = text_splitter.split_documents(documents)





In our example, our "meaningful" primary keys will just be the rowid as a string.  We will determine how many chunks our input file has been chopped into, and then just create a string array of sequential integers.

In [17]:
len(docs)

128

In [21]:
intids = list(range(len(docs)))
stringids = [str(x) for x in intids]
stringids

['0',
 '1',
 '2',
 '3',
 '4',
 '5',
 '6',
 '7',
 '8',
 '9',
 '10',
 '11',
 '12',
 '13',
 '14',
 '15',
 '16',
 '17',
 '18',
 '19',
 '20',
 '21',
 '22',
 '23',
 '24',
 '25',
 '26',
 '27',
 '28',
 '29',
 '30',
 '31',
 '32',
 '33',
 '34',
 '35',
 '36',
 '37',
 '38',
 '39',
 '40',
 '41',
 '42',
 '43',
 '44',
 '45',
 '46',
 '47',
 '48',
 '49',
 '50',
 '51',
 '52',
 '53',
 '54',
 '55',
 '56',
 '57',
 '58',
 '59',
 '60',
 '61',
 '62',
 '63',
 '64',
 '65',
 '66',
 '67',
 '68',
 '69',
 '70',
 '71',
 '72',
 '73',
 '74',
 '75',
 '76',
 '77',
 '78',
 '79',
 '80',
 '81',
 '82',
 '83',
 '84',
 '85',
 '86',
 '87',
 '88',
 '89',
 '90',
 '91',
 '92',
 '93',
 '94',
 '95',
 '96',
 '97',
 '98',
 '99',
 '100',
 '101',
 '102',
 '103',
 '104',
 '105',
 '106',
 '107',
 '108',
 '109',
 '110',
 '111',
 '112',
 '113',
 '114',
 '115',
 '116',
 '117',
 '118',
 '119',
 '120',
 '121',
 '122',
 '123',
 '124',
 '125',
 '126',
 '127']

### **Load into Astra DB**

In [33]:
from cassandra.query import SimpleStatement

# creation of the DB connection
cqlMode = 'astra_db'
session = getCQLSession(mode=cqlMode)
keyspace = getCQLKeyspace(mode=cqlMode)

ERROR:cassandra.connection:Closing connection <AsyncoreConnection(134194936100608) 8dce574d-98e5-4b4d-bdfe-2477340a5d09-us-east1.db.astra.datastax.com:29042:4826ba60-d274-4b3b-944f-51d42fe5ae61> due to protocol error: Error from server: code=000a [Protocol error] message="Beta version of the protocol used (5/v5-beta), but USE_BETA flag is unset"


Below I have pasted our modified version of langchain.vectorstores.Cassandra .  I have modified the from_docuemnts and from_text methods to accept a lsit of ids.

In [38]:
from __future__ import annotations

import typing
import uuid
from typing import (
    Any,
    Callable,
    Dict,
    Iterable,
    List,
    Optional,
    Tuple,
    Type,
    TypeVar,
    Union,
)

import numpy as np

if typing.TYPE_CHECKING:
    from cassandra.cluster import Session

from langchain.docstore.document import Document
from langchain.schema.embeddings import Embeddings
from langchain.schema.vectorstore import VectorStore
from langchain.vectorstores.utils import maximal_marginal_relevance

CVST = TypeVar("CVST", bound="Cassandra")


class Cassandra(VectorStore):
    """Wrapper around Apache Cassandra(R) for vector-store workloads.

    To use it, you need a recent installation of the `cassio` library
    and a Cassandra cluster / Astra DB instance supporting vector capabilities.

    Visit the cassio.org website for extensive quickstarts and code examples.

    Example:
        .. code-block:: python

                from langchain.vectorstores import Cassandra
                from langchain.embeddings.openai import OpenAIEmbeddings

                embeddings = OpenAIEmbeddings()
                session = ...             # create your Cassandra session object
                keyspace = 'my_keyspace'  # the keyspace should exist already
                table_name = 'my_vector_store'
                vectorstore = Cassandra(embeddings, session, keyspace, table_name)
    """

    _embedding_dimension: Union[int, None]

    @staticmethod
    def _filter_to_metadata(filter_dict: Optional[Dict[str, str]]) -> Dict[str, Any]:
        if filter_dict is None:
            return {}
        else:
            return filter_dict

    def _get_embedding_dimension(self) -> int:
        if self._embedding_dimension is None:
            self._embedding_dimension = len(
                self.embedding.embed_query("This is a sample sentence.")
            )
        return self._embedding_dimension

    def __init__(
        self,
        embedding: Embeddings,
        session: Session,
        keyspace: str,
        table_name: str,
        ttl_seconds: Optional[int] = None,
    ) -> None:
        try:
            from cassio.vector import VectorTable
        except (ImportError, ModuleNotFoundError):
            raise ImportError(
                "Could not import cassio python package. "
                "Please install it with `pip install cassio`."
            )
        """Create a vector table."""
        self.embedding = embedding
        self.session = session
        self.keyspace = keyspace
        self.table_name = table_name
        self.ttl_seconds = ttl_seconds
        #
        self._embedding_dimension = None
        #
        self.table = VectorTable(
            session=session,
            keyspace=keyspace,
            table=table_name,
            embedding_dimension=self._get_embedding_dimension(),
            primary_key_type="TEXT",
        )

    @property
    def embeddings(self) -> Embeddings:
        return self.embedding

    @staticmethod
    def _dont_flip_the_cos_score(distance: float) -> float:
        # the identity
        return distance

    def _select_relevance_score_fn(self) -> Callable[[float], float]:
        """
        The underlying VectorTable already returns a "score proper",
        i.e. one in [0, 1] where higher means more *similar*,
        so here the final score transformation is not reversing the interval:
        """
        return self._dont_flip_the_cos_score

    def delete_collection(self) -> None:
        """
        Just an alias for `clear`
        (to better align with other VectorStore implementations).
        """
        self.clear()

    def clear(self) -> None:
        """Empty the collection."""
        self.table.clear()

    def delete_by_document_id(self, document_id: str) -> None:
        return self.table.delete(document_id)

    def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> Optional[bool]:
        """Delete by vector IDs.


        Args:
            ids: List of ids to delete.

        Returns:
            Optional[bool]: True if deletion is successful,
            False otherwise, None if not implemented.
        """

        if ids is None:
            raise ValueError("No ids provided to delete.")

        for document_id in ids:
            self.delete_by_document_id(document_id)
        return True

    def add_texts(
        self,
        texts: Iterable[str],
        metadatas: Optional[List[dict]] = None,
        ids: Optional[List[str]] = None,
        batch_size: int = 16,
        ttl_seconds: Optional[int] = None,
        **kwargs: Any,
    ) -> List[str]:
        """Run more texts through the embeddings and add to the vectorstore.

        Args:
            texts (Iterable[str]): Texts to add to the vectorstore.
            metadatas (Optional[List[dict]], optional): Optional list of metadatas.
            ids (Optional[List[str]], optional): Optional list of IDs.
            batch_size (int): Number of concurrent requests to send to the server.
            ttl_seconds (Optional[int], optional): Optional time-to-live
                for the added texts.

        Returns:
            List[str]: List of IDs of the added texts.
        """
        _texts = list(texts)  # lest it be a generator or something
        if ids is None:
            ids = [uuid.uuid4().hex for _ in _texts]
        if metadatas is None:
            metadatas = [{} for _ in _texts]
        #
        ttl_seconds = ttl_seconds or self.ttl_seconds
        #
        embedding_vectors = self.embedding.embed_documents(_texts)
        #
        for i in range(0, len(_texts), batch_size):
            batch_texts = _texts[i : i + batch_size]
            batch_embedding_vectors = embedding_vectors[i : i + batch_size]
            batch_ids = ids[i : i + batch_size]
            batch_metadatas = metadatas[i : i + batch_size]

            futures = [
                self.table.put_async(
                    text, embedding_vector, text_id, metadata, ttl_seconds
                )
                for text, embedding_vector, text_id, metadata in zip(
                    batch_texts, batch_embedding_vectors, batch_ids, batch_metadatas
                )
            ]
            for future in futures:
                future.result()
        return ids

    # id-returning search facilities
    def similarity_search_with_score_id_by_vector(
        self,
        embedding: List[float],
        k: int = 4,
        filter: Optional[Dict[str, str]] = None,
    ) -> List[Tuple[Document, float, str]]:
        """Return docs most similar to embedding vector.

        Args:
            embedding (str): Embedding to look up documents similar to.
            k (int): Number of Documents to return. Defaults to 4.
        Returns:
            List of (Document, score, id), the most similar to the query vector.
        """
        search_metadata = self._filter_to_metadata(filter)
        #
        hits = self.table.search(
            embedding_vector=embedding,
            top_k=k,
            metric="cos",
            metric_threshold=None,
            metadata=search_metadata,
        )
        # We stick to 'cos' distance as it can be normalized on a 0-1 axis
        # (1=most relevant), as required by this class' contract.
        return [
            (
                Document(
                    page_content=hit["document"],
                    metadata=hit["metadata"],
                ),
                0.5 + 0.5 * hit["distance"],
                hit["document_id"],
            )
            for hit in hits
        ]

    def similarity_search_with_score_id(
        self,
        query: str,
        k: int = 4,
        filter: Optional[Dict[str, str]] = None,
    ) -> List[Tuple[Document, float, str]]:
        embedding_vector = self.embedding.embed_query(query)
        return self.similarity_search_with_score_id_by_vector(
            embedding=embedding_vector,
            k=k,
            filter=filter,
        )

    # id-unaware search facilities
    def similarity_search_with_score_by_vector(
        self,
        embedding: List[float],
        k: int = 4,
        filter: Optional[Dict[str, str]] = None,
    ) -> List[Tuple[Document, float]]:
        """Return docs most similar to embedding vector.

        Args:
            embedding (str): Embedding to look up documents similar to.
            k (int): Number of Documents to return. Defaults to 4.
        Returns:
            List of (Document, score), the most similar to the query vector.
        """
        return [
            (doc, score)
            for (doc, score, docId) in self.similarity_search_with_score_id_by_vector(
                embedding=embedding,
                k=k,
                filter=filter,
            )
        ]

    def similarity_search(
        self,
        query: str,
        k: int = 4,
        filter: Optional[Dict[str, str]] = None,
        **kwargs: Any,
    ) -> List[Document]:
        embedding_vector = self.embedding.embed_query(query)
        return self.similarity_search_by_vector(
            embedding_vector,
            k,
            filter=filter,
        )

    def similarity_search_by_vector(
        self,
        embedding: List[float],
        k: int = 4,
        filter: Optional[Dict[str, str]] = None,
        **kwargs: Any,
    ) -> List[Document]:
        return [
            doc
            for doc, _ in self.similarity_search_with_score_by_vector(
                embedding,
                k,
                filter=filter,
            )
        ]

    def similarity_search_with_score(
        self,
        query: str,
        k: int = 4,
        filter: Optional[Dict[str, str]] = None,
    ) -> List[Tuple[Document, float]]:
        embedding_vector = self.embedding.embed_query(query)
        return self.similarity_search_with_score_by_vector(
            embedding_vector,
            k,
            filter=filter,
        )

    def max_marginal_relevance_search_by_vector(
        self,
        embedding: List[float],
        k: int = 4,
        fetch_k: int = 20,
        lambda_mult: float = 0.5,
        filter: Optional[Dict[str, str]] = None,
        **kwargs: Any,
    ) -> List[Document]:
        """Return docs selected using the maximal marginal relevance.
        Maximal marginal relevance optimizes for similarity to query AND diversity
        among selected documents.
        Args:
            embedding: Embedding to look up documents similar to.
            k: Number of Documents to return.
            fetch_k: Number of Documents to fetch to pass to MMR algorithm.
            lambda_mult: Number between 0 and 1 that determines the degree
                        of diversity among the results with 0 corresponding
                        to maximum diversity and 1 to minimum diversity.
        Returns:
            List of Documents selected by maximal marginal relevance.
        """
        search_metadata = self._filter_to_metadata(filter)

        prefetchHits = self.table.search(
            embedding_vector=embedding,
            top_k=fetch_k,
            metric="cos",
            metric_threshold=None,
            metadata=search_metadata,
        )
        # let the mmr utility pick the *indices* in the above array
        mmrChosenIndices = maximal_marginal_relevance(
            np.array(embedding, dtype=np.float32),
            [pfHit["embedding_vector"] for pfHit in prefetchHits],
            k=k,
            lambda_mult=lambda_mult,
        )
        mmrHits = [
            pfHit
            for pfIndex, pfHit in enumerate(prefetchHits)
            if pfIndex in mmrChosenIndices
        ]
        return [
            Document(
                page_content=hit["document"],
                metadata=hit["metadata"],
            )
            for hit in mmrHits
        ]

    def max_marginal_relevance_search(
        self,
        query: str,
        k: int = 4,
        fetch_k: int = 20,
        lambda_mult: float = 0.5,
        filter: Optional[Dict[str, str]] = None,
        **kwargs: Any,
    ) -> List[Document]:
        """Return docs selected using the maximal marginal relevance.
        Maximal marginal relevance optimizes for similarity to query AND diversity
        among selected documents.
        Args:
            query: Text to look up documents similar to.
            k: Number of Documents to return.
            fetch_k: Number of Documents to fetch to pass to MMR algorithm.
            lambda_mult: Number between 0 and 1 that determines the degree
                        of diversity among the results with 0 corresponding
                        to maximum diversity and 1 to minimum diversity.
                        Optional.
        Returns:
            List of Documents selected by maximal marginal relevance.
        """
        embedding_vector = self.embedding.embed_query(query)
        return self.max_marginal_relevance_search_by_vector(
            embedding_vector,
            k,
            fetch_k,
            lambda_mult=lambda_mult,
            filter=filter,
        )

    @classmethod
    def from_texts(
        cls: Type[CVST],
        texts: List[str],
        embedding: Embeddings,
        metadatas: Optional[List[dict]] = None,
        batch_size: int = 16,
        ids: Optional[List[str]] = None,
        **kwargs: Any,
    ) -> CVST:
        """Create a Cassandra vectorstore from raw texts.

        Originally No support for specifying text IDs, but we have changed to support ids asn an optional param

        Returns:
            a Cassandra vectorstore.
        """
        session: Session = kwargs["session"]
        keyspace: str = kwargs["keyspace"]
        table_name: str = kwargs["table_name"]
        cassandraStore = cls(
            embedding=embedding,
            session=session,
            keyspace=keyspace,
            table_name=table_name,
        )
        cassandraStore.add_texts(texts=texts, metadatas=metadatas, ids=ids)
        return cassandraStore

    @classmethod
    def from_documents(
        cls: Type[CVST],
        documents: List[Document],
        embedding: Embeddings,
        batch_size: int = 16,
        ids: Optional[List[str]] = None,
        **kwargs: Any,
    ) -> CVST:
        """Create a Cassandra vectorstore from a document list.

        Originally No support for specifying text IDs, but we have changed to support ids asn an optional param

        Returns:
            a Cassandra vectorstore.
        """
        texts = [doc.page_content for doc in documents]
        metadatas = [doc.metadata for doc in documents]
        session: Session = kwargs["session"]
        keyspace: str = kwargs["keyspace"]
        table_name: str = kwargs["table_name"]
        return cls.from_texts(
            texts=texts,
            metadatas=metadatas,
            embedding=embedding,
            session=session,
            keyspace=keyspace,
            table_name=table_name,
            ids=ids
        )

First we insert into a table langchain_constitution1 without the ids specicified.  We will see that the ids are UUIDs.

In [41]:
embeddings_function = OpenAIEmbeddings()

# use LangChain's Cassandra integration to load the chunked documents into Astra
docsearch = Cassandra.from_documents(
    documents=docs,
    embedding=embeddings_function,
    session=session,
    keyspace=keyspace,
    table_name='langchain_constitution1',
)

  self.table = VectorTable(


In [50]:
import pandas as pd

rows = session.execute(f"""select row_id, body_blob from vector_preview.langchain_constitution1 limit 5""")

results = pd.DataFrame(rows)
results


Unnamed: 0,row_id,body_blob
0,87c11bff6342478496ad348910f2620a,"During the course of our history, in addition ..."
1,d7cd79c488ba4570a5605a0ef5748344,Proposal and Ratification
2,a5d54d0251a64ca9944f95da9d3909a4,Proposal and Ratification
3,359c562a5b95455687ae8e6f819e5435,Geo: Read\n ...
4,7f2bc784c32e4f4eb6a6f6e94b25b2b2,Historical Note..................................


Next we will insert into a table langchain_constitution2 with the ids specified.  We will see that the ids are integers.

In [47]:
embeddings_function = OpenAIEmbeddings()

# use LangChain's Cassandra integration to load the chunked documents into Astra
docsearch = Cassandra.from_documents(
    documents=docs,
    embedding=embeddings_function,
    session=session,
    keyspace=keyspace,
    ids=stringids,
    table_name='langchain_constitution2',
)

  self.table = VectorTable(


In [51]:
rows = session.execute(f"""select row_id, body_blob from vector_preview.langchain_constitution2 limit 5""")

results = pd.DataFrame(rows)
results


Unnamed: 0,row_id,body_blob
0,6,Article II.
1,90,Dangers as will not admit of delay. No ...
2,46,Article [XX.]
3,115,R
4,55,This amendment was proposed to the legislature...
