In [None]:
!pip install langchain==0.1.16 openai==1.14.2 langchain-openai==0.1.1 langchain-community==0.0.34 faiss-gpu==1.7.2 sentence_transformers==2.7.0 thefuzz==0.20.0 huggingface-hub==0.23.0
!CMAKE_ARGS="-DLLAMA_CUBLAS=on" FORCE_CMAKE=1 pip install llama-cpp-python==0.2.63

In [None]:
import ast
import json
import os
import re
import tarfile
from datetime import datetime

import pandas as pd
import requests
import shutil
from huggingface_hub import hf_hub_download, logging as hf_logging
from langchain.document_loaders.dataframe import DataFrameLoader
from langchain.embeddings.sentence_transformer import SentenceTransformerEmbeddings
from langchain_community.llms import LlamaCpp
from langchain_community.vectorstores import FAISS
from langchain_core.callbacks import CallbackManager, StreamingStdOutCallbackHandler
from langchain_core.prompts import PromptTemplate
from langchain_openai import ChatOpenAI
from thefuzz import fuzz

In [None]:
def filter_references(df_row, key, values):
    return any(d.get(key, None) in values for d in df_row)

def query_nearest_concepts(df_row, db, source_concept_column, target_candidate_column, initial_candidates_k, target_id_column, source_id_column, source_id_key):
    # Query nearest vectors in store
    candidates = db.similarity_search_with_score(df_row[source_concept_column], k=initial_candidates_k)

    # Extract IDs from results
    target_ids = [candidate[0].metadata.get(target_id_column) for candidate in candidates]

    # Extract reference IDs from source
    reference_ids = [reference.get(source_id_key) for reference in df_row[source_id_column]]

    # Check candidate coverage
    initial_matched = bool(set(target_ids).intersection(set(reference_ids))) if df_row[source_id_column] else False

    # Return candidates (ID, concept, score)
    return (
        [(candidate[0].metadata.get(target_id_column), candidate[0].metadata.get(target_candidate_column) or candidate[0].page_content, candidate[1])
            for candidate in candidates],
        initial_matched,
    )

def filter_candidates(df_row, filtering_method, source_concept_column, filtered_candidates_k, source_id_column, source_id_key):
    filtered_candidates = []
    # Filter candidates according to selected method
    if filtering_method == 'embeddings':
        # Embeddings distance
        filtered_candidates = df_row['initial_candidates'][:filtered_candidates_k]
    elif filtering_method == 'fuzzy_ratio':
        # Fuzzy ratio
        filtered_candidates = [(candidate[0], candidate[1],
                                fuzz.ratio(df_row[source_concept_column].lower(), candidate[1].lower()))
                                for candidate in df_row['initial_candidates']]
        filtered_candidates = sorted(filtered_candidates, key=lambda x: x[2], reverse=True)[:filtered_candidates_k]

    # Extract IDs from filtered candidates
    target_ids = [candidate[0] for candidate in filtered_candidates]

    # Extract reference IDs from source
    reference_ids = [reference.get(source_id_key) for reference in df_row[source_id_column]]

    # Check candidate coverage
    filtered_matched = bool(set(target_ids).intersection(set(reference_ids))) if df_row[source_id_column] else False

    # Return filtered candidates (ID, concept, score)
    return (
        filtered_candidates,
        filtered_matched,
    )

def call_llm_rag(df_row, llm, template, source_selection_column, source_id_column, source_id_key, source_equivalence_key):
    if not df_row['filtered_matched']:
        return '', '', None, None, False, False, 0, 0, 0

    # Build candidates list for LLM prompt
    target_candidates = '\n'.join([f"{candidate[0]} - {candidate[1]}" for candidate in df_row['filtered_candidates']][:50])

    prompt = template.format(**{'source_concept': df_row[source_selection_column].strip(),
                                'target_candidates': target_candidates})

    try:
        llm_output = llm.invoke(prompt)
    except:
        return '', prompt, None, None, False, False, 0, 0, 0

    if not llm_output:
        return llm_output, prompt, None, None, False, False, llm.prompt_tokens, llm.completion_tokens, llm.total_tokens

    # Extract concept ID and equivalence type from output
    llm_concept, llm_equivalence = '', ''
    try:
        llm_output_json = json.loads(llm_output)
        llm_concept = llm_output_json.get('answer')
        llm_equivalence = llm_output_json.get('equivalence')
    except:
        pass

    if type(llm_concept) == dict:
        llm_concept = list(llm_concept.values())[0]

    if type(llm_equivalence) == dict:
        llm_equivalence = list(llm_equivalence.values())[0]

    if not llm_concept.isnumeric():
        llm_match = re.search(r'\b(\d{6,})\b', llm_output)
        if llm_match:
            llm_concept = llm_match.group(0)
        else:
            return llm_output, prompt, None, None, False, False, llm.prompt_tokens, llm.completion_tokens, llm.total_tokens

    if llm_equivalence not in ['EQUAL', 'EQUIVALENT', 'NARROWER', 'WIDER', 'INEXACT']:
        equivalence_match = re.search('EQUAL|EQUIVALENT|NARROWER|WIDER|INEXACT', llm_output, re.IGNORECASE)
        if equivalence_match:
            llm_equivalence = equivalence_match.group(0).upper()
        else:
            pass

    partial_match, exact_match = False, False
    for reference in df_row[source_id_column]:
        # Check concept ID
        if llm_concept == reference.get(source_id_key):
            partial_match = True
            # Check equivalence type
            if llm_equivalence == reference.get(source_equivalence_key):
                partial_match, exact_match = False, True
                break

    return llm_output, prompt, llm_concept, llm_equivalence, partial_match, exact_match, llm.prompt_tokens, llm.completion_tokens, llm.total_tokens

In [None]:
class LlamaCppWithUsage(LlamaCpp):

    prompt_tokens = 0
    completion_tokens = 0
    total_tokens = 0

    def _call(self, prompt, stop=None, run_manager=None, **kwargs):
        if self.streaming:
            combined_text_output = ''
            for chunk in self._stream(
                prompt=prompt,
                stop=stop,
                run_manager=run_manager,
                **kwargs,
            ):
                combined_text_output += chunk.text
            return combined_text_output
        else:
            params = self._get_parameters(stop)
            params = {**params, **kwargs}
            result = self.client(prompt=prompt, **params)
            self.prompt_tokens = result['usage'].get('prompt_tokens', 0)
            self.completion_tokens = result['usage'].get('completion_tokens', 0)
            self.total_tokens = result['usage'].get('total_tokens', 0)
            return result['choices'][0]['text']


In [None]:
def download_file(url, local_directory, new_filename=None, infer_extension=False):
    if not os.path.exists(local_directory):
        os.makedirs(local_directory)

    if infer_extension:
        extension = ''
        response = requests.head(url)
        if 'Content-Disposition' in response.headers:
            if '.csv' in response.headers['Content-Disposition']:
                extension = 'csv'
            elif '.tar.gz' in response.headers['Content-Disposition']:
                extension = 'tar.gz'
        new_filename = '{}.{}'.format(new_filename, extension)

    filename = new_filename or url.split('/')[-1]
    local_path = os.path.join(local_directory, filename)

    # If file has not already been downloaded
    if not os.path.exists(local_path):
        with requests.get(url, stream=True) as r:
            with open(local_path, 'wb') as f:
                shutil.copyfileobj(r.raw, f)

    return local_path

def safe_literal_eval(x):
    try:
        return ast.literal_eval(x) if pd.notna(x) else None
    except (ValueError, SyntaxError):
        return []

def run_rag_pipeline(
    embedding_name,
    target_vocabulary,
    target_data,
    target_concept_column,
    target_id_column,
    source_data,
    source_concept_column,
    source_id_column,
    source_id_key,
    source_vocabulary_key,
    source_equivalence_key,
    initial_candidates_k,
    filtering_method,
    filtered_candidates_k,
    llm_repo,
    prompt_template,
    target_column_converters={},
    target_column_separator='',
    source_column_names=[],
    source_column_converters={},
    source_list_columns=[],
    source_filter_column=None,
    sample_size=None,
    llm_file=None,
    n_gpu_layers=-1,
    n_ctx=512,
    temperature=0.0,
    grammar=None,
    api_key=None,
):
    # Load embedding model
    embedding_model = SentenceTransformerEmbeddings(model_name=embedding_name)

    vs_name = '{}-{}-{}'.format(target_vocabulary, embedding_name, '-'.join(target_concept_column)).replace('/', '-').replace('\\', '-')
    vs_path = './vectorstore/{}'.format(vs_name)

    # Column to use for LLM candidates (first column for VS encoding)
    target_candidate_column = target_concept_column[0]

    # If vector store already filled, load from disk
    if os.path.exists(vs_path):
        target_concept_column = '-'.join(target_concept_column)

        db = FAISS.load_local(vs_path, embedding_model, allow_dangerous_deserialization=True)
    # Else load target dataframe and process entries (original data or compressed VS)
    else:
        # Download target data
        target_path = download_file(target_data, './data', new_filename='target_dataset', infer_extension=True)

        # If target data is CSV, load target dataframe
        if '.csv' in target_path:
            # Read CSV into dataframe
            target_df = pd.read_csv(target_path, sep='\t', converters=target_column_converters)

            if target_vocabulary == 'SNOMED':
                target_df = target_df[(target_df['standard_concept'] == 'S') & (target_df['invalid_reason'].isna())
                    & (target_df['vocabulary_id'] == 'SNOMED') & (target_df['domain_id'].isin(['Procedure', 'Measurement', 'Observation', 'Device', 'Condition']))]
            elif target_vocabulary == 'RxNorm':
                target_df = target_df[(target_df['standard_concept'] == 'S') & (target_df['invalid_reason'].isna())
                    & (target_df['vocabulary_id'].isin(['RxNorm', 'RxNorm Extension']))]

            # Allow composite columns for vectorization (target column)
            if len(target_concept_column) > 1:
                target_df['-'.join(target_concept_column)] = target_df.apply(
                    lambda row: ' '.join('' if pd.isna(row[col]) else str(row[col]).replace(target_column_separator, ' ')
                    for col in target_concept_column), axis=1)

            target_concept_column = '-'.join(target_concept_column)

            # Load dataframe
            target_loader = DataFrameLoader(data_frame=target_df, page_content_column=target_concept_column)

            # Encode target documents and store in vectorstore
            db = FAISS.from_documents(target_loader.load(), embedding_model)

            # Save vectorstore to disk
            db.save_local(vs_path)

        # Elif target data is TAR.GZ, uncompress VS into folder
        elif '.tar.gz' in target_path:
            # Create vector store folder
            os.makedirs('./vectorstore', exist_ok=True)

            target_concept_column = '-'.join(target_concept_column)

            # Extract tar.gz file
            with tarfile.open(target_path, 'r:gz') as tar:
                tar.extractall(path='./vectorstore')

            # Load extracted vector store
            db = FAISS.load_local(vs_path, embedding_model, allow_dangerous_deserialization=True)

    # Check vector store
    print('Vector store loaded: {} documents'.format(db.index.ntotal))

    # Download source dataset
    source_path = download_file(source_data, './data', new_filename='source_dataset.csv')

    # Load source dataframe
    source_df = pd.read_csv(source_path, sep='\t', names=source_column_names, converters=source_column_converters, header=0)

    # Column to use for LLM selection (first column for VS querying)
    source_selection_column = source_concept_column[0]

    # Allow composite columns for querying (source column)
    if len(source_concept_column) > 1:
        source_df['-'.join(source_concept_column)] = source_df.apply(
            lambda row: ' '.join('' if pd.isna(row[col]) else str(row[col])
            for col in source_concept_column), axis=1)

    source_concept_column = '-'.join(source_concept_column)

    # Preprocess list columns
    for list_column in source_list_columns:
        source_df[list_column] = source_df[list_column].apply(safe_literal_eval)

    # Filter by target vocabulary
    if target_vocabulary == 'SNOMED':
        source_df = source_df[source_df.ID.str[:2] != '06']
    elif target_vocabulary == 'RxNorm':
        source_df = source_df[source_df.ID.str[:2] == '06']

    original_df_size = len(source_df)

    # Filter dataframe (non-empty rows, reference vocabulary, valid equivalence)
    if source_filter_column:
        source_df = source_df[~source_df[source_filter_column].isna()]
        source_df = source_df[source_df[source_id_column].apply(lambda x: filter_references(x, source_equivalence_key, ['EQUAL', 'EQUIVALENT', 'NARROWER', 'WIDER', 'INEXACT']))]
        if target_vocabulary == 'SNOMED':
            source_df = source_df[source_df[source_id_column].apply(lambda x: filter_references(x, source_vocabulary_key, ['SNOMED']))]
        elif target_vocabulary == 'RxNorm':
            source_df = source_df[source_df[source_id_column].apply(lambda x: filter_references(x, source_vocabulary_key, ['RxNorm', 'RxNorm Extension']))]

    filtered_df_size = len(source_df)

    # Take sample from dataset (if None, don't sample)
    if sample_size:
        source_df = source_df.sample(n=sample_size, random_state=0)
    else:
        sample_size = filtered_df_size

    # Filter initial candidates
    source_df[['initial_candidates', 'initial_matched']] = source_df.apply(query_nearest_concepts, args=(db, source_concept_column, target_candidate_column,
                                                            initial_candidates_k, target_id_column, source_id_column, source_id_key), axis=1, result_type='expand')

    # Filter candidates down to final prompt insertion
    source_df[['filtered_candidates', 'filtered_matched']] = source_df.apply(filter_candidates, args=(filtering_method, source_concept_column,
                                                            filtered_candidates_k, source_id_column, source_id_key), axis=1, result_type='expand')

    # Download and initialize LLM
    if llm_repo and llm_file:
        # Download model through HuggingFace
        hf_logging.set_verbosity_error()
        llm_path = hf_hub_download(repo_id=llm_repo, filename=llm_file, local_dir='./models', token=api_key)

        # Set output grammar
        if grammar:
            grammar = download_file(grammar, './grammars', new_filename='output_grammar.gbnf')

        # Initialize local LLM
        llm = LlamaCppWithUsage(
            model_path=llm_path,
            n_gpu_layers=n_gpu_layers,
            n_ctx=n_ctx,
            streaming=False,
            temperature=temperature,
            max_tokens=32,
            grammar_path=grammar,
            callbacks=[StreamingStdOutCallbackHandler()],
            echo=False,
            verbose=True,
        )
        llm_repo = '{}-{}'.format(llm_repo.replace('/', '-'), llm_file)
    else:
        if not api_key:
            return 'Error: Invalid OpenAI API key.'
        # Initialize remote LLM
        llm = ChatOpenAI(openai_api_key=api_key, temperature=temperature, model=llm_repo)

    # Set instruction template
    template = PromptTemplate.from_template(prompt_template)

    # Call LLM selection
    source_df[['llm_output', 'llm_prompt', 'llm_concept', 'llm_equivalence', 'partial_match', 'exact_match',
               'prompt_tokens', 'completion_tokens', 'total_tokens']] = source_df.apply(
        call_llm_rag, args=(llm, template, source_selection_column, source_id_column,
        source_id_key, source_equivalence_key), axis=1, result_type='expand')

    # Create output folder
    if not os.path.exists('./output'):
        os.makedirs('./output')

    # Remove auxiliary column (source concept)
    if source_selection_column != source_concept_column:
        source_df = source_df.drop(source_concept_column, axis=1)

    # Write result dataframe to file
    run_timestamp = datetime.now().strftime('%Y-%m-%d-%H-%M-%S')
    df_output_filename = 'rag-{}-{}-{}-{}-{}-{}-{}.csv'.format(vs_name, source_concept_column, initial_candidates_k,
        filtering_method, filtered_candidates_k, llm_repo, run_timestamp)
    source_df.to_csv('./output/{}'.format(df_output_filename), sep='\t', index=False)

    # Calculate match metrics
    initial_candidates_coverage = len(source_df[source_df['initial_matched'] == True])
    filtered_candidates_coverage = len(source_df[source_df['filtered_matched'] == True])
    partial_matches = len(source_df[source_df['partial_match'] == True])
    exact_matches = len(source_df[source_df['exact_match'] == True])
    prompt_tokens_sum = source_df['prompt_tokens'].sum()
    completion_tokens_sum = source_df['completion_tokens'].sum()
    total_tokens_sum = source_df['total_tokens'].sum()

    # Print results (run parameters and match metrics)
    run_results = [
        'RAG results - {}'.format(run_timestamp),
        '====================================================',
        'Parameters:',
        'Target vocabulary: {}'.format(target_vocabulary),
        'LLM repo: {}'.format(llm_repo),
        'Embedding: {}'.format(embedding_name),
        'Target concept column(s): {}'.format(target_concept_column),
        'Source concept column(s): {}'.format(source_concept_column),
        'Initial candidates K: {}'.format(initial_candidates_k),
        'Filtering method: {}'.format(filtering_method),
        'Filtered candidates K: {}'.format(filtered_candidates_k),
        '====================================================',
        'Match metrics',
        'Original dataset size = {}'.format(original_df_size),
        'Filtered dataset size ({}) = {}'.format(source_filter_column, filtered_df_size),
        'Sampled dataset size = {}'.format(sample_size),
        'Prompt tokens sum = {}'.format(prompt_tokens_sum),
        'Completion tokens sum = {}'.format(completion_tokens_sum),
        'Total tokens sum = {}'.format(total_tokens_sum),
        'Initial candidates (k={}) coverage = {}/{} ({:.3f})'.format(
            initial_candidates_k, initial_candidates_coverage, sample_size, initial_candidates_coverage / sample_size),
        'Filtered candidates (k={}) coverage = {}/{} ({:.3f})'.format(
            filtered_candidates_k, filtered_candidates_coverage, initial_candidates_coverage,
            filtered_candidates_coverage / initial_candidates_coverage if initial_candidates_coverage != 0 else 0),
        'Partial matches (only concept) = {}/{} ({:.3f})'.format(
            partial_matches, filtered_candidates_coverage, partial_matches / filtered_candidates_coverage if filtered_candidates_coverage != 0 else 0),
        'Exact matches (concept and equivalence) = {}/{} ({:.3f})'.format(
            exact_matches, filtered_candidates_coverage, exact_matches / filtered_candidates_coverage if filtered_candidates_coverage != 0 else 0),
    ]

    run_results = '\n'.join(run_results)
    print(run_results)

    # Write match metrics to file
    with open('./output/rag-results-{}.txt'.format(run_timestamp), 'w') as f:
        f.write(run_results)

    # Compress vector store into output folder (if it is not already compressed)
    if not os.path.exists('./output/{}.tar.gz'.format(vs_name)):
        with tarfile.open('./output/{}.tar.gz'.format(vs_name), 'w:gz') as tar:
            tar.add(vs_path, arcname=os.path.basename(vs_path))

    # Unload model fom memory
    llm = None

In [None]:
SYSTEM_RAG_PROMPT = """You are a helpful assistant.
It does not matter whether the given term is a valid or complete medical term, your task is finding the closest one in the list.
Return your answer in JSON with the format {{"answer": "<numeric code>", "equivalence": "<equivalence_type>"}}, where equivalence_type must be
one of the following: EQUAL, EQUIVALENT, WIDER, NARROWER or INEXACT.
Copy the exact code in the answer."""

USER_RAG_PROMPT = """Q: The given term "{source_concept}" is closest to which of the following alternatives:
{target_candidates}
A:"""

In [None]:
BASIC_RAG_PROMPT = f'''{SYSTEM_RAG_PROMPT}
{USER_RAG_PROMPT}'''

In [None]:
LLAMA_RAG_PROMPT = f'''<|im_start|>system
{SYSTEM_RAG_PROMPT}<|im_end|>
<|im_start|>user
{USER_RAG_PROMPT}<|im_end|>
<|im_start|>assistant'''

In [None]:
MISTRAL_RAG_PROMPT = f"""[INST] <<SYS>>
{SYSTEM_RAG_PROMPT}
<</SYS>>
{USER_RAG_PROMPT}
[/INST]"""

In [None]:
# Example usage
run_rag_pipeline(embedding_name='thenlper/gte-large', # HuggingFace repo for embedding model
                       target_vocabulary='SNOMED', # SNOMED or RxNorm
                       target_data='<OMOP vocabularies.csv>', # Available at OHDSI Athena (athena.ohdsi.org)
                       target_concept_column=['concept_name', 'concept_synonym_name'],
                       target_id_column='concept_id',
                       source_data='<SIGTAP with annotated correspondences.csv>', # From reference dataset
                       source_concept_column=['Name', 'Description'],
                       source_id_column='Annotations',
                       source_id_key='conceptId',
                       source_vocabulary_key='vocabulary_id',
                       source_equivalence_key='equivalence',
                       initial_candidates_k=1000,
                       filtering_method='embeddings', # embeddings or fuzzy_ratio
                       filtered_candidates_k=50,
                       llm_repo='TheBloke/Mistral-7B-Instruct-v0.2-GGUF', # HuggingFace repo for LLM
                       prompt_template=MISTRAL_RAG_PROMPT, # BASIC_RAG_PROMPT, LLAMA_RAG_PROMPT or MISTRAL_RAG_PROMPT
                       target_column_converters={'concept_id': str, 'concept_code': str},
                       target_column_separator='£',
                       source_column_names=['ID', 'Name', 'Description', 'sourceCode', 'Annotations'],
                       source_column_converters={'ID': str, 'sourceCode': str},
                       source_list_columns=['Annotations'],
                       source_filter_column='Annotations',
                       sample_size=1, # None, for no sampling
                       llm_file='mistral-7b-instruct-v0.2.Q2_K.gguf', # HuggingFace filename for LLM
                       n_gpu_layers=-1,
                       n_ctx=32768,
                       temperature=0.0,
                       grammar='<JSON grammar>', # Grammar-constrained LLM sampling (JSON generation)
                       api_key=None, # HuggingFace or OpenAI API key
                       )