# Cohere RAG with dense retriever and ReRank model
- references: https://docs.cohere.com/v2/docs/rag-complete-example
- [ ] check performance with SciFact
<br>
“This work was supported by compute credits from a Cohere Labs Research Grant, these grants are designed to support academic partners conducting research with the goal of releasing scientific artifacts and data for good projects.”

In [1]:
import cohere
import numpy as np
from typing import List, Tuple, Dict
import os
from dotenv import load_dotenv
import json
import time # for timing functions
import sys
from colorama import Fore, Style, Back
import random


In [2]:
# load secret from local .env file
def get_key():
    #load secret .env file
    load_dotenv()

    #store credentials
    _key = os.getenv('COHERE_API_KEY')

    #verify if it worked
    if _key is not None:
        print(Fore.GREEN + "all is good, beautiful!")
        return _key
    else:
        print(Fore.LIGHTRED_EX + "API Key is missing")

# initilize client
co = cohere.ClientV2(get_key())
        

[32mall is good, beautiful!


In [3]:
# load documents
#read documents as .txt files in data director
def read_documents_with_doi(directory_path: str) -> List[Dict[str, str]]:
    """
    Reads documents and their DOIs from individual files in a directory.

    Args:
        directory_path (str): Path to the directory containing the document files.

    Returns:
        List[Dict[str, str]]: A list of dictionaries, each containing 'doi' and 'text' keys.
    """

    documents_with_doi = []
    for filename in os.listdir(directory_path):
        if filename.endswith(".txt"):
            file_path = os.path.join(directory_path, filename)
            with open(file_path, "r", encoding="utf-8") as file:
                lines = file.readlines()
                if len(lines) >= 1:
                    doi = lines[0]
                    text = "".join(lines[1:]).strip()
                    documents_with_doi.append(f"{doi} {text}\n")
    return documents_with_doi

# initialize empty search query
search_queries = []
# Embed the documents
def document_embed(documents:List[str])->List[float]:
    """
    Embeds the documents from a list provided from read_documents_with_doi()
    NOTE: Change the model based on the test condition
    """
    doc_emb = co.embed(
        #model="embed-v4.0",
        model="embed-english-v3.0",
        input_type="search_document",
        texts=[doc for doc in documents],
        embedding_types=["float"],
        ).embeddings.float
    return doc_emb

# Embed the search query
def query_embed(search_queries:List[str])->List[float]:
    """
    Embeds the query from a list provided in search_queries variable
    NOTE: change model depending on test condition
    """
    query_emb = co.embed(
        #model="embed-v4.0",
        model="embed-english-v3.0",
        input_type="search_query",
        texts=search_queries,
        embedding_types=["float"],
        ).embeddings.float
    return query_emb

# retrieve top_k and compute similarity using dot product
def retrieve_top_k(top_k, query_embedded, documents_embedded, documents)->List[str]:
    """
    returns the top_k documents based on dot product similarity
    """

    scores = np.dot(query_embedded, np.transpose(documents_embedded))[0]#ordered list!
    # takes top scores, and returns sorted list and returns indices sliced by top_k
    max_idx = np.argsort(-scores)[:top_k]
    # returns documents by index
    retrieved_docs = [documents[item] for item in max_idx]
    # returns a list of documents
    return retrieved_docs

def rerank_documents(retrieved_documents,search_queries,threshold,top_k)->List[str]:
    """
    takes retrieved_documents as input along with search_queries and runs them through the 
    rerank model from cohere for semantic similarity. 

    top_n = top_k
    Limits those returned by a threshold score. this is to reduce those that are irrelevant.

    NOTE: change the model based on the test condition
    """
    # Rerank the documents
    results = co.rerank(
        #model="rerank-v3.5",
        model="rerank-english-v3.0",
        query=search_queries[0],
        documents=[doc for doc in retrieved_documents],
        top_n=top_k,
        max_tokens_per_doc=4096,# defaults to 4096
    )

    # Display the reranking results
    #for idx, result in enumerate(results.results):
    #    print(f"Rank: {idx+1}")
    #    print(f"Score: {result.relevance_score}")
    #    print(f"Document: {retrieved_documents[result.index]}\n")

    #returns only those over threshold
    reranked_docs = [
        retrieved_documents[result.index] for result in results.results if result.relevance_score >=threshold
    ]
    reranked_with_score = [(result.relevance_score, retrieved_documents[result.index].split("\n")[0].strip("DOI: ")) for result in results.results if result.relevance_score >=threshold]

    print(f"reranked_documents: {reranked_docs}")
    print(f"length of reranked_documents: {len(reranked_docs)}")

    return reranked_docs, reranked_with_score

def cohere_rag_pipeline(directory_path,search_queries,top_k,threshold):

    # retrieve documents from directory
    documents = read_documents_with_doi(directory_path)
    print(f"Length of documents: {len(documents)}")

    # randomize order of documents
    random.shuffle(documents)

    # embed the documents
    documents_embedded = document_embed(documents)

    #embed the query:
    query_embedded = query_embed(search_queries)

    # retrieve the top_k documents
    retrieved_documents = retrieve_top_k(top_k, query_embedded, documents_embedded, documents)

    # rerank the documents using the Rerank model from Cohere
    reranked_documents, reranked_DOIs_with_score = rerank_documents(retrieved_documents,search_queries,threshold,top_k)
    # set system instructions
    instructions = """
                    You are an academic research assistant.
                    You must include the DOI in your response.
                    If there is no content provided, ask for a different question.
                    Please structure your response like this:
                    Summary: summary statement here. 
                    DOI: summary of the text associated with this DOI.
                    Address me as, 'my lady'.
                    """
    # create messages to model
    messages = [{"role":"user",
                "content": search_queries[0]},
                {"role":"system",
                "content":instructions}]

    # Generate the response NOTE: change the model for the test condition!
    resp = co.chat(
        model="command-a-03-2025",
        messages=messages,
        documents=reranked_documents,
    )

    return resp, reranked_documents, reranked_DOIs_with_score




## debugging 

In [None]:
#  run here to test functions avove
# ****** Pipeline ********
# set directory path
directory_path = "/Users/poppyriddle/Documents/PhD/Research_proposal/Part_3/part_3_cohere/data"
# initialize search_queries 
search_queries = [input("what is your query?")]#could be a list of multiple queries
# set top_k
top_k = 5
#set threshold 
threshold = 0.1

response, reranked_documents_end, reranked_DOIs_with_score_end = cohere_rag_pipeline(directory_path,search_queries,top_k,threshold)
# Display the response
print(Fore.LIGHTMAGENTA_EX + f"{response.message.content[0].text}")
print(Fore.LIGHTCYAN_EX + f"------\nReranked documents:")
for doc in reranked_documents_end:
    print(doc)

# Display the citations and source documents
if response.message.citations:
    print(Fore.LIGHTYELLOW_EX + "\nCITATIONS:")
    for citation in response.message.citations:
        print(f"source text: {citation.text},\nsource: {citation.sources[0].document.get('content').split("\n")[0]}\n------")

# Analysis
Precision, recall, accuracy, F1 scores and faithfulness
## Precision, recall, F1 score
### references
- https://scikit-learn.org/stable/auto_examples/model_selection/plot_precision_recall.html
- https://vitalflux.com/accuracy-precision-recall-f1-score-python-example/


In [4]:
from typing import List, Dict, Set
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score, balanced_accuracy_score
import openpyxl
import pandas as pd
import numpy as np
from colorama import Fore, Back, Style

## automated version 
Currently Works!


In [5]:
#initial dataframe to capture results from each query and results
#ONLY DO THIS AT THE BEGINNING OF THE ANALYSIS PROCEDURE, OTHERWISE, IT WILL ERASE THE PREVIOUS RESULTS!!

results_df = pd.DataFrame(columns=['Query','Precision','Recall','F1-Score','Accuracy', 'Balanced accuracy', 'Faithfulness score', 'Documents score', 'Response'])
results_df


Unnamed: 0,Query,Precision,Recall,F1-Score,Accuracy,Balanced accuracy,Faithfulness score,Documents score,Response


In [6]:
golden_set_df = pd.read_excel("golden_set.xlsx")
#golden_set_df_test = golden_set_df.head(3)
#golden_set_df
print(Fore.LIGHTGREEN_EX + f" Golder set loaded!")


[92m Golder set loaded!


In [7]:
#run the test from here


#***** Begin chat session *****
# set directory path
#directory_path = "/Users/poppyriddle/Documents/PhD/Research_proposal/Part_3/part_3_cohere/data"
#directory_path = "/Users/poppyriddle/Documents/PhD/Research_proposal/Part_3/part_3_cohere/data_jats"
directory_path = "/Users/poppyriddle/Documents/PhD/Research_proposal/Part_3/part_3_cohere/data_multi_lang_ja"

# read documents and dois from the directory path
documents_with_doi = read_documents_with_doi(directory_path)
documents = [doc[0].split('\n')[1:] for doc in documents_with_doi]
print(f"Length of documents: {len(documents)}")
print(f"Length of corpus: {len(documents_with_doi)}")

# Countdown function
def countdown(seconds:int)->None:
    # Loop until seconds is 0
    while seconds > 0:
        print(Fore.LIGHTMAGENTA_EX + f"{seconds}", end='      \r')  # Print current countdown value
        time.sleep(1)  # Wait for 1 second
        seconds -= 1  # Decrease seconds by 1
    print("The time has come!")  # Countdown finished message

def evaluate_retrieval(retrieved_dois, ground_truth, response, query:str,reranked_DOIs_with_score_end)->Dict:
    corpus_doi_list = []
    #corpus_list is a global variable in rag_pipeline()
    for each in range(len(documents_with_doi)):
        #a = documents_with_doi[each].get('doi',"")
        a = documents_with_doi[each].split("\n")[0].lstrip("DOI: ")
        corpus_doi_list.append(a)
    print(len(corpus_doi_list))

    def compare_lists(list1, list2, list3):
        for val in list1:
            if val in list2:
                list3.append(1)
            else:
                list3.append(0)

    #set y_true so that len(y_true)==len(corpus_doi_list)
    y_true = []
    compare_lists(corpus_doi_list,ground_truth,y_true)
    y_true = np.array(y_true)
    y_pred = []
    compare_lists(corpus_doi_list,retrieved_dois,y_pred)


    # calculate metrics - could also use sklearn.metrics functions such as precision_score, but this is easier to read
    precision = precision_score(y_true, y_pred, average="micro")
    recall = recall_score(y_true, y_pred,average="micro")
    f1 = f1_score(y_true, y_pred, average="micro")
    accuracy = accuracy_score(y_true, y_pred, normalize=True)
    balanced_accuracy = balanced_accuracy_score(y_true, y_pred)

    faithfulness_score = 0
    for each in retrieved_dois:
        if each in response.message.content[0].text:
            faithfulness_score+=1
        else:
            faithfulness_score+=0

        
    return {
        'Query':f"{query}",
        "Precision": precision,
        "Recall": recall,
        "F1-Score": f1,
        "Accuracy":accuracy,
        "Balanced accuracy":balanced_accuracy,
        "Faithfulness score":faithfulness_score,
        "Documents score":str(reranked_DOIs_with_score_end),
        "Response":response.message.content[0].text
    }

def print_results(retrieved_dois, ground_truth, response, query:str, reranked_DOIs_with_score_end)->Dict:
    """
    Prints a nicely ordered set of results from evalaute_retrieval()
    """

    results = evaluate_retrieval(retrieved_dois, ground_truth, response, query, reranked_DOIs_with_score_end)
    print(f"For query: {results['Query']}:")
    print(f"Precision: {results['Precision']:.3f}")
    print(f"Recall: {results['Recall']:.3f}")
    print(f"F1-Score: {results['F1-Score']:.3f}")
    print(f"Accuracy: {results['Accuracy']:.3f}")
    print(f"Balanced accuracy: {results['Balanced accuracy']:.3f}")
    print(f"Faithfulness score: {results['Faithfulness score']}")
    print(f"Documents score: {results['Documents score']}")
    return results

#print_results()

def cohere_test_loop(query:str,ground_truth:List[str]):

    # set top_k
    top_k = 5
    #set threshold 
    threshold = 0.10

    response, reranked_documents_end, reranked_DOIs_with_score_end = cohere_rag_pipeline(directory_path,query,top_k,threshold)
    
    # Extract DOIs from retrieved documents
    retrieved_dois = [doc.split("\n")[0].strip("DOI: ") for doc in reranked_documents_end]
    print("Retrieved DOIs:", retrieved_dois)

    # Display the response
    print(Fore.LIGHTYELLOW_EX + f"{response.message.content[0].text}")

    new_result = print_results(retrieved_dois, ground_truth, response, query, reranked_DOIs_with_score_end)
    # add the new result to the df
    results_df.loc[len(results_df)] = new_result

    #save the queries and responses to separate dataframe to be manually annontated
    answer_relevance_df = results_df[['Query','Response']].copy(deep=True)

    # save out answer_relevance_df
    filename="analysis/Round2/results/english_multi_lang_answer_relevance_results.xlsx"
    answer_relevance_df.to_excel(filename)

    filename = "analysis/Round2/results/english_multi_lang_analysis_results.xlsx"
    os.makedirs(os.path.dirname(filename), exist_ok=True)
    results_df.to_excel(filename)

    # rate limit functions
    seconds = 10
    print(Fore.LIGHTRED_EX + f"Waiting for {seconds} seconds...")
    countdown(seconds)
    
    return results_df

#golden_set_df_test['Response\nDense'] = golden_set_df_test.apply(lambda x: test_loop(x.query,x.ground_truth), axis=1)
golden_set_df_query = golden_set_df['query'].to_list()
golden_set_df_ground_truth = golden_set_df['ground_truth'].to_list()

loop_length = 5
while loop_length:
    for i in range(len(golden_set_df_query)):
        
        cohere_test_loop([golden_set_df_query[i]],golden_set_df_ground_truth[i])
        print(Fore.LIGHTCYAN_EX + f"Working on row: {i} in loop: {loop_length}")
    loop_length = loop_length-1

print(Fore.LIGHTMAGENTA_EX + f"!!!!! All Done!!!!!")

    

Length of documents: 96
Length of corpus: 96
Length of documents: 96
reranked_documents: ["DOI: 10.1109/ADL.1998.670425\n Title: メタデータの品質の評価: 米国政府情報検索サービス (GILS) の評価から得られた結果と方法論上の考慮事項 Assessing metadata quality: findings and methodological considerations from an evaluation of the US Government Information Locator Service (GILS)\nAbstract: メタデータ レコードを評価するための定性的および定量的なコンテンツ分析手法の適用について説明します。米国連邦政府機関による政府情報検索サービス (GILS) の実装に関する大規模な評価研究の一部として、このメタデータ評価では、メタデータの品質に関する探索的調査のための一連の基準と手順が開発されました。著者らは、記録コンテンツ分析とその他のいくつかの方法を使用して、GILS が政府機関が情報の配布と管理の責任を果たすのに役立っているかどうか、また GILS がユーザーの期待にどの程度応えているかを調査しました。記載された探索的分析に基づいて、著者らは、さまざまな種類のメタデータ (例: 記述的、トランザクション的など) を評価するには、さまざまな基準と手順が必要になる可能性があると結論付けています。 GILS の大規模な評価研究をサポートすることに加えて、メタデータ コンテンツのこの分析結果は、メタデータの品質の評価に関する対話の発展に貢献します。 Discusses the application of qualitative and quantitative content analysis techniques to assess metadata records. As a component of a larger evaluation study of US Federal agencies' implementation of the Government Information Lo