# Start IRIS intersystems vector database ( In terminal )

# Define constants

In [1]:
import os
PERSONA = """
You are a Healthcare Customer Support Assistant. Your role is to assist users with inquiries strictly related to Healthcare, based on the context provided. Do not answer queries outside of context provided.
Guidelines:
1. Provide factually correct responses in natural language. Do not give details you don't have.
2. Keep answers very brief and relevant to the question.
3. Always provide complete answers and avoid asking follow-up questions.
4. If a question is unclear and you cannot formulate a proper response, reply formally e.g., "Sorry, I can't understand. Can you please rephrase it?"
5. Never request personal information from users.
6. Respond in English ONLY"
7. Do Not answer questions outside the scope of provided context.
"""
OPENAI_LLM = "gpt-4-turbo"
ASSISTANT_RESPONSE = 'Content Understood! Go ahead and ask questions.'

USERNAME = 'demo'
PASSWORD = 'demo'
HOSTNAME = 'localhost'
NAMESPACE = 'USER'
PORT = '1972'
TABLE_NAME = 'intersystems_table'
CONNECTION_STRING = f"iris://{USERNAME}:{PASSWORD}@{HOSTNAME}:{PORT}/{NAMESPACE}"
OPENAI_API_KEY = '<you-openai-api-key>'

# Database operations

In [2]:
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy import create_engine, text

def create_database_engine():
    """
    Creates and returns a SQLAlchemy engine using a connection string from constants.
    """
    try:
        connection_string = CONNECTION_STRING
        engine = create_engine(connection_string)
        return engine
    except Exception as e:
        print(f"Failed to create database engine: {e}")
        return None

def remove_table():
    """
    Drops a specified table if it exists.
    """
    engine = create_database_engine()
    if engine:
        try:
            with engine.connect() as conn:
                with conn.begin():
                    drop_sql = f"DROP TABLE IF EXISTS {TABLE_NAME}"
                    conn.execute(text(drop_sql))
            return True
        except SQLAlchemyError as e:
            print(f"Failed to drop table: {e}")
            return False
    return False

def create_data_table():
    """
    Creates a new table with specified schema.
    """
    engine = create_database_engine()
    if engine:
        try:
            with engine.connect() as conn:
                with conn.begin():
                    create_sql = f"CREATE TABLE {TABLE_NAME} (text VARCHAR(1200), text_vector VECTOR(DOUBLE, 384))"
                    conn.execute(text(create_sql))
            return True
        except SQLAlchemyError as e:
            print(f"Failed to create table: {e}")
            return False
    return False

def verify_table_existence():
    """
    Checks if the specified table exists in the database.
    """
    engine = create_database_engine()
    if engine:
        try:
            with engine.connect() as conn:
                with conn.begin():
                    check_sql = f"SELECT 1 FROM {TABLE_NAME} WHERE 1=0"
                    conn.execute(text(check_sql))
            return True
        except SQLAlchemyError as e:
            print(f"Table check failed - Table does not exist: {e}")
            return False
    return False

def insert_table_data(df):
    """
    Inserts data from a DataFrame into the specified table.
    """
    engine = create_database_engine()
    if engine:
        try:
            with engine.connect() as conn:
                with conn.begin():
                    for index, row in df.iterrows():
                        insert_sql = text("""
                                INSERT INTO {table_name}
                                (text, text_vector)
                                VALUES (:text, TO_VECTOR(:text_vector))
                            """.format(table_name=TABLE_NAME))
                        conn.execute(insert_sql, {
                            'text': row['text'],
                            'text_vector': str(row['text_vector'])
                        })
            return True
        except SQLAlchemyError as e:
            print(f"Data insertion failed: {e}")
            return False
    return False

def perform_vector_search(question_embedding):
    """
    Searches the database table using vector search with the given question embedding.
    """
    engine = create_database_engine()
    if engine:
        try:
            search_sql = text("""
                SELECT TOP 10 text FROM {table_name}
                ORDER BY VECTOR_DOT_PRODUCT(text_vector, TO_VECTOR(:search_vector)) DESC
            """.format(table_name=TABLE_NAME))
            with engine.connect() as conn:
                with conn.begin():
                    results = conn.execute(search_sql, {'search_vector': str(question_embedding)}).fetchall()
                    return [result[0] for result in results]
        except SQLAlchemyError as e:
            print(f"Vector search failed: {e}")
            return []
    return []

def count_table_records():
    """
    Returns the count of records in the specified table.
    """
    engine = create_database_engine()
    if engine:
        with engine.connect() as conn:
            result = conn.execute(text(f"SELECT COUNT(*) FROM {TABLE_NAME}"))
            count = result.fetchone()[0]
            return count
    return 0

In [3]:
if verify_table_existence():
    print("Table exists.")
else:
    print("Table does not exist.")

Table exists.


In [4]:
print("Attempting to remove existing table:", remove_table())
print("Attempting to create table:", create_data_table())
print("Verify table creation:", verify_table_existence())
print("Record count in table:", count_table_records())

Attempting to remove existing table: True
Attempting to create table: True
Verify table creation: True
Record count in table: 0


# Helper functions

In [5]:
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import PyPDFLoader
from sentence_transformers import SentenceTransformer
import pandas as pd
import re
from io import BytesIO
import PyPDF2

embedding_model = SentenceTransformer('all-MiniLM-L6-v2')

def extract_text_from_file_path(file_path):
    """
    Extract text from the provided file path.
    Supports 'text/plain' and 'pdf' file types.
    """
    if file_path.endswith('.txt'):
        with open(file_path, 'r', encoding='utf-8') as file:
            content = file.read()
            return content

    elif file_path.endswith('.pdf'):
        with open(file_path, 'rb') as file:
            pdf_reader = PyPDF2.PdfReader(file)
            accumulated_text = ''
            for page in pdf_reader.pages:
                accumulated_text += page.extract_text() + '\n'
            return accumulated_text

    else:
        print("Unsupported file format. Acceptable formats include .txt and .pdf.")
        return None

def extract_text_from_file(file_stream):
    """
    Extract text from the provided file stream.
    Supports 'text/plain' and 'pdf' file types.
    """
    file_type = file_stream.type
    if 'text/plain' in file_type:
        file_stream.seek(0)
        content = file_stream.read().decode('utf-8')
        return content

    elif 'pdf' in file_type:
        file_stream.seek(0)
        pdf_reader = PyPDF2.PdfReader(file_stream)
        accumulated_text = ''
        for page in pdf_reader.pages:
            accumulated_text += page.extract_text() + '\n'
        return accumulated_text

    else:
        print("Unsupported file format. Acceptable formats include .txt and .pdf.")
        return None

def segment_text(file_path):
    """
    Segments text into chunks from the specified file path, based on its extension.
    """
    if file_path.lower().endswith('.txt'):
        with open(file_path, 'r') as file:
            full_text = file.read()

        text_segmenter = RecursiveCharacterTextSplitter(
            separators=["\n\n", "\n", ' ', ''],
            chunk_size=800,
            chunk_overlap=50,
            length_function=len,
            is_separator_regex=False
        )
        return text_segmenter.create_documents([full_text])

    elif file_path.lower().endswith('.pdf'):
        pdf_loader = PyPDFLoader(file_path)
        documents = pdf_loader.load()
        text_segmenter = RecursiveCharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=200
        )
        chunks = []
        for document in documents:
            chunks.extend(text_segmenter.create_documents([document]))
        return chunks

    else:
        print("Unsupported file format. Acceptable formats include .txt and .pdf.")
        return None

def sanitize_text(raw_text):
    """
    Cleans and standardizes the provided text string by removing unwanted characters
    and fixing common formatting issues.
    """
    try:
        cleaned_text = re.sub(r'[^a-zA-Z0-9 .,;:\'"\(\)\[\]\-]', '', raw_text)
        cleaned_text = re.sub(r'(?<=[.,;:\'"])(?=[^\s])', ' ', cleaned_text)
        cleaned_text = re.sub(r'(?<=[^\s])(?=[.,;:\'"])', ' ', cleaned_text)
        cleaned_text = re.sub(r'\s+', ' ', cleaned_text)
        cleaned_text = re.sub(r'(\d+)([A-Za-z])', r'\1 \2', cleaned_text)
        cleaned_text = re.sub(r'([A-Za-z])(\d+)', r'\1 \2', cleaned_text)
        cleaned_text = cleaned_text.strip()
        return cleaned_text
    except Exception as e:
        print(f"Error while processing text: {e}")
        return raw_text

def create_text_chunks(input_text):
    """
    Creates manageable chunks of text from a larger input text string.
    """
    segmenter = RecursiveCharacterTextSplitter(
        separators=["\n\n", "\n", ' ', ''],
        chunk_size=800,
        chunk_overlap=50,
        length_function=len,
        is_separator_regex=False
    )
    return [sanitize_text(chunk.page_content) for chunk in segmenter.create_documents([input_text])]

def construct_dataframe(text):
    """
    Constructs a dataframe from chunks of text, appending sentence embeddings.
    """
    chunks = create_text_chunks(text)
    df = pd.DataFrame(chunks, columns=["text"])
    embeddings = embedding_model.encode(df['text'].tolist(), normalize_embeddings=True)
    df['text_vector'] = embeddings.tolist()
    return df

def generate_text_embedding(input_text):
    """
    Generates a normalized embedding for a given text string.
    """
    return embedding_model.encode(input_text, normalize_embeddings=True).tolist()

def concatenate_vector_results(results):
    """
    Concatenates a list of strings into a single string.
    """
    return ' '.join(results)



# LLM API call

In [6]:
from openai import OpenAI, APIError, APIConnectionError, RateLimitError

openai_client = OpenAI(api_key=OPENAI_API_KEY)

def retrieve_response_from_llm(question, combined_vectors):
    """
    Retrieves a response from the OpenAI Language Learning Model (LLM) based on the
    provided question and combined vectors context.
    """
    try:
        message_context = [
            {'role': 'system', 'content': PERSONA},
            {'role': 'user', 'content': str(combined_vectors)},
            {'role': 'assistant', 'content': ASSISTANT_RESPONSE},
            {'role': 'user', 'content': question}
        ]
        chat_completion = openai_client.chat.completions.create(
            messages=message_context,
            model=OPENAI_LLM,
            temperature=0.6,
        )
        answer = chat_completion.choices[0].message.content
        return answer

    except APIConnectionError as e:
        print(f"Connection to OpenAI API failed: {e}")
        return "Connection to OpenAI API failed."

    except RateLimitError as e:
        print(f"Rate limit exceeded for OpenAI API requests: {e}")
        return "Rate limit exceeded for OpenAI API requests."

    except APIError as e:
        print(f"An error was returned by the OpenAI API: {e}")
        return "An error was returned by the OpenAI API."

    except Exception as e:
        print(f"An unexpected error occurred: {e}")
        return "An unexpected error occurred. Please try again later."

# Abstracted functions

In [7]:
def initialize_database_and_insert_data(uploaded_file):
    """
    Initializes the database by potentially dropping the existing table, creating a new table, and inserting
    data extracted from the uploaded file.
    """
    print("Attempting to drop existing table: ", remove_table())
    print("Attempting to create a new table: ", create_data_table())
    print("Current records in the table: ", count_table_records())
    
    text = extract_text_from_file_path(uploaded_file)
    dataframe = construct_dataframe(text)
    insertion_status = insert_table_data(dataframe)
    print("Updated records in the table: ", count_table_records())
    return insertion_status

def generate_response_to_question(question):
    """
    Generates a response to the given question using the OpenAI Language Learning Model after searching
    for relevant vectors in the database.
    """
    question_embeddings = generate_text_embedding(question)
    search_results = perform_vector_search(question_embeddings)
    combined_context = concatenate_vector_results(search_results)
    response = retrieve_response_from_llm(question, combined_context)
    return response, search_results[:5]

In [8]:
file_path = "./sample_data/large_document.txt"

In [9]:
initialize_database_and_insert_data(file_path)

Attempting to drop existing table:  True
Attempting to create a new table:  True
Current records in the table:  0
Updated records in the table:  503


True

In [10]:
answer, matching_docs = generate_response_to_question("What is the recommended dose?")

In [11]:
print(answer)

The recommended dose for Imatinib Teva varies by condition:

1. For HESCEL (Hypereosinophilic Syndrome/Chronic Eosinophilic Leukemia), the recommended dose is 100 mg per day, which can be increased to 400 mg per day based on response and absence of adverse reactions.
   
2. For GIST (Gastrointestinal Stromal Tumors), the recommended dose is 400 mg per day for adults with unresectable and/or metastatic malignant GIST. There is limited data on the effects of dose increases from 400 mg to 600 mg or 800 mg in patients progressing at a lower dose.

For other conditions mentioned, such as CML (Chronic Myeloid Leukemia) and Ph-positive ALL (Philadelphia chromosome-positive Acute Lymphoblastic Leukemia), specific doses were not detailed in your text and typically depend on factors like patient response and specific treatment protocols. Always follow the prescribing doctor's instructions and guidelines.


In [12]:
matching_docs

['For CML and GIST , your doctor may prescribe a higher or lower dose depending on how you respond to the treatment . If your daily dose is 800 mg (2 capsules) , you should take one capsule in the morning and a second capsule in the evening . If you are being treated for Ph-positive ALL : The starting dose is 600 mg to be taken as 1 capsule of 400 mg plus 2 capsules of 100 mg once a day . If you are being treated for MDSMPD : The starting dose is 400 mg , to be taken as 1 capsule once a day . If you are being treated for HESCEL : The starting dose is 100 mg , to be taken as 1 capsule of 100 mg once a day . Your doctor may decide to increase the dose to 400 mg , to be taken as 1 capsule of 400 mg once a day , depending on how you respond to treatment . If you are being treated for DFSP :',
 'For CML and GIST , your doctor may prescribe a higher or lower dose depending on how you respond to the treatment . If your daily dose is 800 mg (8 tablets) , you should take 4 tablets in the mornin