In [1]:
from openai import OpenAI
from datetime import datetime
import hashlib
import re
import os
from tqdm import tqdm
import numpy as np
from bs4 import BeautifulSoup
from urllib.request import urlopen
import logging
from pinecone import Pinecone, PodSpec
from typing import List
import configparser
# import erdantic

logger = logging.getLogger()
logger.setLevel(logging.DEBUG)

In [2]:
config = configparser.ConfigParser()
config.read('config.ini')
PINECONE_INDEX_NAME = config['DEFAULT']['PineconeIndexName']
NAMESPACE = config['DEFAULT']['Namespace']
EMBEDDING_MODEL = config['DEFAULT']['EmbeddingModel']
ENVIRONMENT = config['DEFAULT']['Environment']

In [3]:
class MissingEnvironmentVariable(Exception):
    pass
    
class PineconeConnection():
    """Gets pinecone key from the environment and passes it to Pinecone.  Performs Pinecone-related tasks."""

    def __init__(self, pinecone_index_name: str = PINECONE_INDEX_NAME, env: str = ENVIRONMENT, namespace: str = NAMESPACE):
        """
        Keyword Arguments:
        pinecone_index_name -- An index is like a table.  It will be created in the Pinecone vector database if it does not already exist.
        env -- An environment can contain multiple indexes, but in this case it only needs one.
        namespace -- An index can contain multiple namespaces, but in this case it only needs one.
        """
        pinecone_key = os.environ.get('PINECONE_API_KEY', False)
        self._pinecone_index_name = pinecone_index_name
        self._namespace = namespace
        self._env = env
        if pinecone_key:
            self._pc = Pinecone(api_key=pinecone_key, environment=env)
        else:
            raise MissingEnvironmentVariable("PROBLEM: no PINECONE_API_KEY in environment")

    def create_pinecone_index(self):
        """Creates an index in Pinecone, which is like a table, and store it in a member variable."""
        list_indexes = self._pc.list_indexes()
        list_names = [x['name'] for x in list_indexes.indexes]
        if not self._pinecone_index_name in list_names:
            pc.create_index(name=self._pinecone_index_name, dimension=embedding_dimension, metric='cosine', spec=PodSpec(environment=self._env, pod_type='p1.x1'))
        self._pinecone_index = self._pc.Index(self._pinecone_index_name)

    def upsert_documents(self, documents: List[str], oaic: 'OpenAIConnection'):
        """
        Arguments:
        documents -- a list of documents to upsert
        Keyword Arguments:
        oaic -- an OpenAIConnection tp which the documents will be upserted
        """
        now = datetime.utcnow()
    
        embeddings = oaic.get_embeddings_from_response(oaic.get_response_from_documents(documents))
    
        pinecone_list = list()
        for emb, doc in zip(embeddings, documents):
            pinecone_list.append(
                {
                    "id": hashlib_sha(doc),
                    "values": emb,
                    "metadata": {"doc": doc, "timestamp": now}
                }
            ) 
    
        total_upserted = self._pinecone_index.upsert(
            vectors=pinecone_list,
            namespace=self._namespace
        )['upserted_count']
    
        return total_upserted

    def choose_document_from_pinecone(self, query: str, oaic: 'OpenAIConnection', top_k: int = 3):
        # get embedding from THE SAME embedder as the documents
        query_embedding = oaic.get_embedding_from_document(query)
    
        return self._pinecone_index.query(
          vector=query_embedding,
          top_k=top_k,
          namespace=self._namespace,
          include_metadata=True
        ).get('matches')

    def delete_documents(self, documents: List[str]):
        hashes = [hashlib_sha(doc) for doc in documents]
        return self._pinecone_index.delete(ids=hashes, namespace=self._namespace)

class OpenAIConnection:
    """Gets Open AI key from the environment and passes it to OpenAI.  Performs OpenAI-related tasks."""

    model: str
    
    def __init__(self, model: str = EMBEDDING_MODEL):
        api_key = os.environ.get("OPENAI_API_KEY", False)
        self._model = model
        if api_key:
            self.client = OpenAI(api_key=api_key)
        else:
            raise MissingEnvironmentVariable("PROBLEM: no OPENAI_API_KEY in environment")
        self.embedding_dimension = len(self.get_embedding_from_document('these are some words to test the embedding dimension'))
    
    def get_response_from_documents(self, documents: List[str]):
        """
        Arguments:
        documents -- A list of documents (strings) that will be parsed by the model, returning embeddings in the form of a response
        model -- The model that parses the documents
        """
        return self.client.embeddings.create(input = documents, model = self._model)

    def get_embeddings_from_response(self, resp) -> List[List[float]]:
        """
        Returns a list of embeddings
        Arguments:
        resp -- A response that contains embeddings in resp.data
        """
        return [item.embedding for item in resp.data]

    def get_embedding_from_document(self, document: str) -> List[float]:
        """
        Arguments:
        document -- A str that will be used to create on embedding
        """
        return self.get_embeddings_from_response(self.get_response_from_documents([document]))[0]

In [4]:
def hashlib_sha(somestring: str) -> str:
    """
    Arguments:
    somestring -- We will make a hash of this string
    Return value:
    The hash
    """
    sha = hashlib.sha256()
    sha.update(somestring.encode())
    return sha.hexdigest()

print(hashlib_sha('Make a hash of this string'))

768d2bb50eca2cb375e681635b7a4082b65a8868d1be9fae718d26382d5c947b


In [5]:
def make_ttc_list(print_list : bool = False) -> List[str]:
    """
    Return value:
    A list of Tao Te Ching passages from wikisource
    """
    ttc_list = list()
    with urlopen('https://en.wikisource.org/wiki/Translation:Tao_Te_Ching') as response:
        soup = BeautifulSoup(response, 'html.parser')
        num = 1
        for anchor in soup.find_all('p'):
            if(re.match("[a-zA-Z]", anchor.text[0]) and "Note:" not in anchor.text):
                if(print_list):
                    print(num)
                    print("---------------")
                    print(anchor.text)
                num += 1
                ttc_list.append(str(num) + ". " + anchor.text)
                if(print_list):
                    print("---------------")
            if("Truthful words are not pleasant," in anchor.text):
                break
    return ttc_list

In [11]:
def main():
    ttc_list = make_ttc_list()
    my_pc = PineconeConnection()
    my_pc.create_pinecone_index()
    my_oaic = OpenAIConnection()
    print("Now testing Pinecone.  This may require a slight wait, while pinecone does the upsert.")
    my_pc.upsert_documents(ttc_list, my_oaic)
    my_pc.choose_document_from_pinecone('What is the Tao?', my_oaic, top_k = 1)
    print("Test is done.")
    print("Enter your question:")
    x = input()
    pinecone_response = my_pc.choose_document_from_pinecone(x, my_oaic, top_k  = 1)
    text = pinecone_response[0]["metadata"]["doc"]
    query = f"""
    I have a question as well as a quote from the Tao Te Ching.  Please answer the question using the quote, and explain how the quote
    suggests your answer.  The question is: {x}
    And the quote from the Tao Te Ching is:
    -----------
    {text}
    -----------
    Now please answer the question with reference to the information in the quote."""
    chat_completion = my_oaic.client.chat.completions.create(
        messages=[
            {
                "role": "user",
                "content": query,
            }
        ],
        model="gpt-3.5-turbo",
    )
    print("ANSWER TO YOUR QUESTION")
    print(chat_completion.choices[0].message.content)
    print("I USED THE FOLLOWING QUOTE FROM THE TAO TE CHING")
    print(text)
    print("Now deleting documents - run this command when you are done.")
    my_pc.delete_documents(ttc_lst)

In [None]:
main()

Now testing Pinecone.  This may require a slight wait, while pinecone does the upsert.


  now = datetime.utcnow()


Test is done.
Enter your question:


 Do students learn more by reading or by solving homework exercises?
