In [None]:
import os
import logging
import ssl
import nltk
import requests
import zipfile
import json
import re
from sentence_transformers import SentenceTransformer
from elasticsearch import Elasticsearch
from sentence_transformers import SentenceTransformer
from elasticsearch import Elasticsearch
from ibm_watson_machine_learning.foundation_models import ModelInference

In [None]:
# Function to extract data from PDF using an API

def extract_data_from_pdf(input_doc):
    # Set the URL for the API
    url = "http://xxx.com/api/v1/task/process"
    print("URL being accessed:", url)  # Debug: print the URL being accessed

    # Headers
    headers = {'accept': 'application/octet-stream'}

    # Parameters and payload setup
    files = {
        'parameters': (None, '{}', 'application/json'),
        'model_id': (None, 'processor', 'application/json'),
        'inputs.file': (input_doc, open(input_doc, 'rb'), 'application/pdf')
    }

    # Make the POST request
    response = requests.post(url, headers=headers, files=files)
    files['inputs.file'][1].close()  # Ensure the file is closed after sending

    # Check if the response was successful
    if response.status_code != 200:
        print(f"Failed to process document: {response.status_code} - {response.text}")
        return None

    # Check if the content type is 'application/zip'
    if 'application/zip' not in response.headers.get('Content-Type', ''):
        print("Received incorrect file type, expected application/zip")
        return None

    # Output path for the results
    output_path = f"{input_doc[:-4]}_results.zip"

    # Write the response content to a file
    with open(output_path, 'wb') as f:
        f.write(response.content)

    print(f"Results saved to {output_path}")
    return output_path

# Function to unzip a file
def unzip_file(zip_file_path, extract_to_folder):
    if zip_file_path is not None:
        with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
            print(f"unzip folder:  {extract_to_folder}")
            zip_ref.extractall(extract_to_folder)
    else:
        print("No zip file provided to unzip")



In [None]:
# Extracting the information from pdf and saving to output.json
#input_pdf_file = "NYSE_DDS_2023.pdf"
input_pdf_file ="IBM_Annual_Report_2022.pdf"
ziped_pdf_data = extract_data_from_pdf(input_pdf_file)
extract_to_folder = ''
if ziped_pdf_data:
    unzip_file(ziped_pdf_data, extract_to_folder)


In [None]:
# Text Extraction from JSON

def add_newline_after_colon_and_number(input_string):
    """
    Add a newline after a colon followed by a number in the input string.

    Args:
    input_string (str): Input string to process.

    Returns:
    str: Processed string with newline after colon-number pairs.
    """
    output_string = ""
    i = 0
    while i < len(input_string):
        char = input_string[i]
        if char == ":":
            # Look for the end of the number
            j = i + 1
            while j < len(input_string) and (input_string[j].isdigit() or input_string[j] == "-" or input_string[j] == " "):
                j += 1
            # Add the colon and number together
            output_string += input_string[i:j].strip() + " \n"
            i = j
        else:
            output_string += char
            i += 1
    return output_string.strip() + "\n"

def extract_text_from_page_details(page_details):
    """
    Extract text from the page details.

    Args:
    page_details (dict): Details of a page containing text tokens.

    Returns:
    str: Extracted text from the page.
    """
    # Use list comprehension to extract text from tokens
    tokens = page_details.get("tokens", [])
    extracted_text = " ".join(token.get("text", "") for token in tokens)
    extracted_text = add_newline_after_colon_and_number(extracted_text)
    return extracted_text.strip()

def process_pages_and_structure_text(pages_list):
    """
    Process the list of pages to extract and structure text.

    Args:
    pages_list (list): List of page details.

    Returns:
    list: List of extracted and structured texts for each page.
    """
    extracted_texts = []
    for page_details in pages_list:
        page_info = extract_text_from_page_details(page_details)
        # Join sentences with "\n" for readability
        page_info_string = "\n".join(nltk.sent_tokenize(page_info))
        extracted_texts.append(page_info_string)
    return extracted_texts

# Load JSON data from the "output.json" file
with open("output.json", "r") as file:
    data = json.load(file)

# Extract pages list
pages_list = data.get("pages", [])

# Process pages
extracted_texts = process_pages_and_structure_text(pages_list)

# Open the file in write mode and save the extracted texts
output_file = 'extracted_texts.txt'

with open(output_file, 'w') as file:
    for i, text in enumerate(extracted_texts, start=1):
        file.write(f"Page {i}:\n")
        file.write(f"{text}\n\n")

print("Extracted texts have been saved to 'extracted_texts.txt'.")

In [None]:
# Functions for creating index and chunks

def create_index(es_client, index_name):
    index_name = index_name
    mapping = {
        "mappings": {
            "properties": {
                "embedding": {
                    "type": "dense_vector",
                    "dims": 384  # Dimensionality of the model's embeddings
                }
            }
        }
    }
    if es_client.indices.exists(index=index_name):
        logging.info(f"Deleting existing index: {index_name}")
        es_client.indices.delete(index=index_name)
    logging.info(f"Creating index: {index_name}")
    es_client.indices.create(index=index_name, body=mapping)

def load_text_file(file_path):
    logging.info(f"Loading text file: {file_path}")
    with open(file_path, 'r') as file:
        text = file.read()
    return text

def chunk_text(text, chunk_size=200, overlap=100):
    logging.info(f"Chunking text of length {len(text)}")
    words = text.split()
    if len(words) <= chunk_size:
        return [text]
    chunks = []
    for i in range(0, len(words), chunk_size - overlap):
        chunk = words[i:i + chunk_size]
        chunks.append(' '.join(chunk))
    #logging.info(f"Created chunks: {chunks}")
    return chunks

def generate_embeddings(text_chunks):
    logging.info(f"Generating embeddings for {len(text_chunks)} chunks.")
    embeddings = model.encode(text_chunks)
    #logging.info(f"Generated embeddings: {embeddings}")
    return embeddings

def store_embeddings_in_elasticsearch(index_name, embeddings, text_chunks, file_key):
    for idx, (embedding, chunk) in enumerate(zip(embeddings, text_chunks)):
        document = {
            "embedding": embedding.tolist(),
            "text": chunk  # Store actual text chunk here
        }
        es.index(index=index_name, id=f"{file_key}-{idx}", document=document)
        #logging.info(f"Stored embedding for chunk {idx} of {file_key} in Elasticsearch")

def create_chunks(file_path):
    text = load_text_file(file_path)
    if text and text.strip():  # Check if text is not empty
        text_chunks = chunk_text(text, chunk_size=100, overlap=50)
        embeddings = generate_embeddings(text_chunks)
        store_embeddings_in_elasticsearch(index_name, embeddings, text_chunks, "file")

In [None]:
# Text Chunking and Embedding

# Ensure the NLTK data is available
nltk.download('punkt')

# Set up logging
logging.basicConfig(level=logging.INFO)

# Set environment variables
os.environ['ELASTIC_HOST'] = 'xxx'
os.environ['ELASTIC_PORT'] = '31066'
os.environ['ELASTIC_USERNAME'] = 'xxx'
os.environ['ELASTIC_PASSWORD'] = 'xxx'
os.environ['ELASTIC_TLS_CERT'] = 'certificate.crt'

# Elasticsearch credentials
ELASTIC_HOST = os.getenv("ELASTIC_HOST")
ELASTIC_PORT = os.getenv("ELASTIC_PORT")
ELASTIC_USERNAME = os.getenv("ELASTIC_USERNAME")
ELASTIC_PASSWORD = os.getenv("ELASTIC_PASSWORD")
ELASTIC_TLS_CERT = os.getenv("ELASTIC_TLS_CERT")

# Create an SSL context for Elasticsearch
ssl_context = ssl.create_default_context(cafile=ELASTIC_TLS_CERT)
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE

# Initialize the Elasticsearch client
es = Elasticsearch(
    [f"https://{ELASTIC_USERNAME}:{ELASTIC_PASSWORD}@{ELASTIC_HOST}:{ELASTIC_PORT}"],
    ssl_context=ssl_context
)

# Load a pre-trained sentence transformer model
logging.info("Loading SentenceTransformer model...")
model = SentenceTransformer('all-MiniLM-L6-v2')

# Specify the single text file path
file_path = "extracted_texts.txt"

# Run the main function with the specified text file
index_name = "embeddings_pdf"
create_index(es, index_name)

create_chunks(file_path)

In [None]:
# Prompt template
META_PROMPT_EN = """<|system|>
You are Granite Chat, an AI language model developed by IBM. You are a cautious assistant. You carefully follow instructions. You are helpful and harmless and you follow ethical guidelines and promote positive behavior.
<|user|>

You are an AI powered assistant who has access to vector database.
You should help by providing accurate answers to questions.
You answer must only answer the question asked.
If you don't know the answer to a question, DO NOT share false information. Instead, say that you don't know the answer.

You are given the following information from knowledge base that should be your only source of information to answer the question.

[Document]

{context_str}

[End]

{query_str}

<|assistant|>"""

In [None]:
# Define functions for getting prompte template and querying LLM

def get_prompt(context_str, query_str):
    """
    Fill the META_PROMPT_EN template with the given context and query.

    Args:
        context_str (str): The context string extracted from the knowledge base.
        query_str (str): The user's query string.

    Returns:
        str: The filled prompt ready to be used with the language model.
    """
    return META_PROMPT_EN.format(context_str=context_str, query_str=query_str)

def query_model(model, query):
    if query in cache:
        return cache[query]
    
    response = model.generate(query)  # Pass only the query text
    generated_text = response.get('results', [{}])[0].get('generated_text', '')
    cache[query] = generated_text
    return generated_text


def search_single_embeddings(index_name, query_embedding, top_k=3):
    logging.info(f"Query embedding shape: {query_embedding.shape}")
    response = es.search(
        index=indexname,
        body={
            "query": {
                "script_score": {
                    "query": {"match_all": {}},
                    "script": {
                        "source": "cosineSimilarity(params.query_vector, 'embedding') + 1.0",
                        "params": {"query_vector": query_embedding.tolist()}
                    }
                }
            },
            "size": top_k,
            "_source": ["text"]
        }
    )
    logging.info(f"Elasticsearch response: {response}")
    return response

def search_similar_embeddings(index_list, query_embedding, top_k=3):
    logging.info(f"Query embedding shape: {query_embedding.shape}")
    # Specify multiple indices separated by commas
    indices = index_list  # Adjust this to your actual index names
    response = es.search(
        index=indices,  # Use the indices variable
        body={
            "query": {
                "script_score": {
                    "query": {"match_all": {}},
                    "script": {
                        "source": "cosineSimilarity(params.query_vector, 'embedding') + 1.0",
                        "params": {"query_vector": query_embedding.tolist()}
                    }
                }
            },
            "size": top_k,
            "_source": ["text"]
        }
    )
    logging.info(f"Elasticsearch response: {response}")
    return response

def run_rag_query(index_list, query):
    query_embedding = sentence_model.encode([query])[0]
    #search_result = search_single_embeddings(query_embedding, top_k=3)
    search_result = search_similar_embeddings(index_list, query_embedding, top_k=3)

    try:
        hits = search_result['hits']['hits']
        relevant_chunks = [' '.join(hit['_source']['text'].split()) for hit in hits]
        context_str = "\n\n".join(relevant_chunks)
        logging.info("Here are the most relevant chunks: %s", relevant_chunks)
    except (KeyError, IndexError) as e:
        logging.error(f"Error accessing document content: {e}")
        return {'error': 'Error processing retrieved document'}

    # Constructing the prompt with context
    prompt = get_prompt(context_str, query)
    logging.info(f"Generated prompt: {prompt}")

    # Generate response using the model
    generated_text = query_model(watson_model, prompt)
    formatted_response = f"Query: {query}\nGenerated Response: {generated_text}"
    logging.info(f"Final formatted response: {formatted_response}")

    return {'response': generated_text}


In [None]:
# Query Processing and Semantic Search and Integration with Language Model (LLM)


# Set environment variables for LLM
os.environ['MODEL_ID'] = 'ibm/granite-13b-chat-v2'
os.environ['API_KEY'] = 'xxx'
os.environ['URL'] = 'xxx'
os.environ['PROJECT_ID'] = 'xxx
# IBM Watson model configuration
generate_params = {
    'max_new_tokens': 1000  # Adjust based on model's limit and desired output length
}

# Initialize the IBM Watson Machine Learning model
watson_model = ModelInference(
    model_id=os.getenv('MODEL_ID'),
    credentials={"apikey": os.getenv('API_KEY'), "url": os.getenv('URL')},
    project_id=os.getenv('PROJECT_ID'),
    params=generate_params
)

# Simple cache to store responses
cache = {}

In [None]:
# Example usage:
query = "What is ...?"
index_list="embeddings_xxx"
result = run_rag_query(index_list, query)
print(result['response'])