# Generative AI - Prompt Agent - Loop

In [19]:
import os
from dotenv import load_dotenv
import openai
from langchain_openai import ChatOpenAI
# JSON loader
from langchain_community.document_loaders import JSONLoader
import json
from pathlib import Path
# Vector DB
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import FAISS
# prompt
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain.chains import create_retrieval_chain, create_history_aware_retriever
from langchain_core.messages import HumanMessage, AIMessage
from langchain_core.prompts import ChatPromptTemplate
# summarization
from langchain.prompts import PromptTemplate, ChatPromptTemplate
from langchain.chains import LLMChain, MapReduceChain, load_summarize_chain
from langchain.chains.combine_documents.stuff import StuffDocumentsChain
from langchain_community.document_loaders import TextLoader

# 1. Static variable

In [20]:
vectorDB_declare = "faiss_vector_declare_index_db"
vectorDB_handle = "faiss_vector_handle_index_db"
vectorDB_exif_keyword = "faiss_vector_exif_keyword_index_db"
code_purpose_directory = "code_purpose"
code_purpose_file = code_purpose_directory+"//code_purpose.txt"
declare_code_block_path = "./declare_code_block"

In [21]:
class LLMAnswer:
    def __init__(self, app_name, answer, similar_app):
        self.app_name = app_name
        self.answer = answer
        self.similar_app = similar_app
    
    def to_json(self):
        return json.dumps(self.__dict__, indent=4)

# 2. Function

In [22]:
# Setup model
load_dotenv()
api_key = os.getenv('OPENAI_API_KEY')
openai.api_key = api_key
llm = ChatOpenAI(model="gpt-4-turbo",temperature=0)
# Setup embedding
embeddings = OpenAIEmbeddings()
# loading database
db_connect = FAISS.load_local(vectorDB_declare, embeddings,allow_dangerous_deserialization=True)
print(db_connect.index.ntotal) 
#print(db_connect.index_to_docstore_id)

64495


In [23]:
# Function create directory
def create_directory(directory_path):
    if not os.path.exists(directory_path):
        os.makedirs(directory_path)
        print(f"Directory '{directory_path}' created successfully.")
    else:
        print(f"Directory '{directory_path}' already exists.")
# Funtion write string to text file
def write_string_to_file(string, file_path):
    with open(file_path, 'w') as file:
        file.write(string)
    print(f"String written to '{file_path}' successfully.")
# Function delete file
def delete_file(file_path):
    if os.path.exists(file_path):
        os.remove(file_path)
        print(f"File '{file_path}' deleted successfully.")
    else:
        print(f"File '{file_path}' does not exist.")
# Function list all file in directory to list
def list_files_in_directory(directory_path):
    file_list = os.listdir(directory_path)
    file_list = [file for file in file_list if os.path.isfile(os.path.join(directory_path, file))]
    return file_list
# Function load JSON
def load_json_data(file_path,json_root):
    # Define the JSONLoader with the appropriate parameters
    loader = JSONLoader(
        file_path=file_path,
        jq_schema="."+json_root+"[]",
        text_content=False
    )
    # Load the JSON data
    data = loader.load()
    return data
# Function get sub-string in string
def get_sub_string(input_string, delimiter='/'):
    parts = input_string.split(delimiter)
    filename = parts[-1]
    filename_without_extension = filename.split('.')[0]
    return filename_without_extension

# 3. Main

In [24]:
# Create directory
create_directory(code_purpose_directory)

Directory 'code_purpose' already exists.


In [25]:
# Create retriever
retriever = db_connect.as_retriever()
retriever

VectorStoreRetriever(tags=['FAISS', 'OpenAIEmbeddings'], vectorstore=<langchain_community.vectorstores.faiss.FAISS object at 0x7978dba5bd30>)

In [26]:
# List all files
declare_code_files = list_files_in_directory(declare_code_block_path)

In [27]:
# Load data
declare_code_file = declare_code_block_path+"/"+declare_code_files[0]
print("File name :"+declare_code_file)
declare_code_documents = load_json_data(declare_code_file,"declare")
# metadata input
metadata_input = (declare_code_documents[0].metadata)["source"]
print("Metadata input: ",metadata_input)
app_name = get_sub_string(metadata_input)
print("App name: ",app_name)
# Declare similar_apps list
similar_apps = []
llm_response = []
# for i in range(len(declare_code_documents)):
for i in range(80,90):
    print("******************** Document page_content no-"+str(i)+" ********************")
    document = declare_code_documents[i]
    print("Document: ",document)
    query = declare_code_documents[i].page_content
    print("Query:", query)
    docs_similar = retriever.invoke(query)
    print("Number of similar: ",len(docs_similar))
    print("Docs similar: ",docs_similar)
    for j in range(len(docs_similar)):
        similar_metadata = (docs_similar[j].metadata)["source"]
        similar_app = get_sub_string(similar_metadata)
        similar_apps.append(similar_app)
    print("++++++++++++++++++++++++++++++++++++++++++++++++++++")
    similar_apps = list(set(similar_apps))
    print("Similar app is a list", str(similar_apps))
    prompt = ChatPromptTemplate.from_messages([
        MessagesPlaceholder(variable_name="chat_history"),
        ("user","{input}"),
        ("user","Given the above programming code block, generate a search query to look up information relevant to the conversation")
    ])
    # print("prompt: ",prompt)
    print("++++++++++++++++++++++++++++++++++++++++++++++++++++")
    prompt = ChatPromptTemplate.from_messages(
        [
            ("placeholder", "{chat_history}"),
            ("user", "{input}"),
            (
                "user",
                "Given the above conversation, generate a search query to look up to get information relevant to the conversation",
            ),
        ]
    )

    retriever_chain = create_history_aware_retriever(llm, retriever, prompt)

    prompt = ChatPromptTemplate.from_messages(
        [
            (
                "system",
                "Answer the user's questions based on the below context:\n\n{context}",
            ),
            ("placeholder", "{chat_history}"),
            ("user", "{input}"),
        ]
    )
    document_chain = create_stuff_documents_chain(llm, prompt)
    qa = create_retrieval_chain(retriever_chain, document_chain)
    question = f"""
    What is Exif metadata in this list (Make, Model, Software, GPS, Datetime) included in this code block below? \n
    {docs_similar} \n
    If EXIF is not found in the code block, the simple answer is "the code block is unrelated to any kind of EXIF metadata."
    """
    print("---------------------START QUESTION---------------------")
    print("Question: ",question)
    result = qa.invoke({"input": question})
    code_purpose = result["answer"]
    print("---------------------END QUESTION---------------------")
    print("---------------------START CODE PURPOSE---------------------")
    print(code_purpose)
    print("---------------------END CODE PURPOSE---------------------")
    write_string_to_file(code_purpose, code_purpose_file)
    # Summary
    prompt_template = """
    You are provided with a paragraph listing the types of EXIF metadata in the code block.
    ----------
    {text}
    ----------
    Question: Examine the paragraph and summarize which types of EXIF metadata the above paragraph relates to in the inclusion list (Make, Model, Software, GPS, Datetime) if EXIF metadata exists. 
    Please only respond with the EXIF metadata name in this list (Make, Model, Software, GPS, Datetime). If you can't find it, answer "No".
    EXIF metadata name:
    """
    # print(prompt_template)
    prompt = PromptTemplate.from_template(prompt_template)
    prompt = PromptTemplate(template=prompt_template,input_variables=["text"])
    stuff_chain = load_summarize_chain(llm,
                                   chain_type="stuff",
                                   prompt=prompt,
                                   verbose = True
                                  )
    print(stuff_chain.llm_chain.prompt.template)
    loader = TextLoader(code_purpose_file)
    code_purpose_docs = loader.load()
    print("===============================SUMMARY PURPOSE===============================")
    print("Code purpose docs: ",code_purpose_docs)
    output_summary = stuff_chain.invoke(code_purpose_docs)
    # output_summary
    answer = output_summary["output_text"]
    llm_answer_instance = LLMAnswer(app_name, answer, similar_apps)
    print("LLMAnswer as JSON:", llm_answer_instance.to_json())
    delete_file(code_purpose_file)
    #break

File name :./declare_code_block/Gallery-Photo-Gallery-App-1.0_declare.json
Metadata input:  /root/metaLeak-ml-llm-rag-ubuntu/declare_code_block/Gallery-Photo-Gallery-App-1.0_declare.json
App name:  Gallery-Photo-Gallery-App-1
******************** Document page_content no-80 ********************
Document:  page_content='t = new byte[] { 109, 105, 102, 49 };' metadata={'source': '/root/metaLeak-ml-llm-rag-ubuntu/declare_code_block/Gallery-Photo-Gallery-App-1.0_declare.json', 'seq_num': 81}
Query: t = new byte[] { 109, 105, 102, 49 };
Number of similar:  4
Docs similar:  [Document(page_content='t = new byte[] { 109, 105, 102, 49 };', metadata={'source': '/root/metaLeak-ml-llm-rag-ubuntu/declare_code_block/Camera-For-Sasmung-S22-Ultra-2.0_declare.json', 'seq_num': 89}), Document(page_content='t = new byte[] { 109, 105, 102, 49 };', metadata={'source': '/root/metaLeak-ml-llm-rag-ubuntu/declare_code_block/Camera-for-Oppo-Reno-6-Selfie-Expert-Camera-1.6_declare.json', 'seq_num': 98}), Documen