# Tutorial: Creating a Hybrid Retrieval Pipeline

- **Level**: Intermediate
- **Time to complete**: 15 minutes
- **Components Used**: [`DocumentSplitter`](https://docs.haystack.deepset.ai/docs/documentsplitter), [`SentenceTransformersDocumentEmbedder`](https://docs.haystack.deepset.ai/docs/sentencetransformersdocumentembedder), [`DocumentJoiner`](https://docs.haystack.deepset.ai/docs/documentjoiner), [`InMemoryDocumentStore`](https://docs.haystack.deepset.ai/docs/inmemorydocumentstore), [`InMemoryBM25Retriever`](https://docs.haystack.deepset.ai/docs/inmemorybm25retriever), [`InMemoryEmbeddingRetriever`](https://docs.haystack.deepset.ai/docs/inmemoryembeddingretriever), and [`TransformersSimilarityRanker`](https://docs.haystack.deepset.ai/docs/transformerssimilarityranker)
- **Prerequisites**: None
- **Goal**: After completing this tutorial, you will have learned about creating a hybrid retrieval and when it's useful.

In [1]:
import os
import sys
import warnings

import pandas as pd

from tqdm import tqdm
from uuid import uuid4

from langchain_chroma import Chroma
# from langchain_huggingface import HuggingFaceEmbeddings

from datasets import load_dataset
from haystack import Document

# from langchain_core.documents import Document

# Get the current working directory of the notebook
notebook_dir = os.getcwd()
# Add the parent directory to the system path
sys.path.append(os.path.join(notebook_dir, '../'))

# import log_files
from data_processing import DataProcessing
from text_generation_models import TextGenerationModelFactory

In [2]:
pd.set_option('max_colwidth', 800)
# pd.set_option('display.max_columns', None)
# pd.set_option('display.max_rows', None)

## Load Data

- **Problems:**
    1. Multiple files with distinct retrieved results
    2. Same file where we duplicate retrieved results. Duplicate as in first run: 0 to 7, second run 7 to 14, but it includes 0 to 7, so file would be 0 to 7, 0 to 7, then 7 to 14.

- `new_file = True:` Only when it's the first usage.
- `new_file = False:` Only when it's after the first usage. Reasoning is: the next step of LLM labeling (prediction/non-prediction) will use this saved file and store results in same file. 

In [7]:
new_file = True
base_data_path = os.path.join(notebook_dir, '../data/', 'financial_phrase_bank')
if new_file == True: 
    new_file_name = "all_data-adjusted_header.csv"
    financial_full_path = os.path.join(base_data_path, new_file_name)
    df = load_dataset("csv", data_files=financial_full_path, split="train")

else:
    new_file_name = "text_label_name_meta_data-v1.csv"
    financial_full_path = os.path.join(base_data_path, new_file_name)
    df = load_dataset("csv", data_files=financial_full_path, split="train")

df

Generating train split: 0 examples [00:00, ? examples/s]

Dataset({
    features: ['sentiment', 'sentence'],
    num_rows: 4846
})

## Overview

**Hybrid Retrieval** combines keyword-based and embedding-based retrieval techniques, leveraging the strengths of both approaches. In essence, dense embeddings excel in grasping the contextual nuances of the query, while keyword-based methods excel in matching keywords.

There are many cases when a simple keyword-based approaches like BM25 performs better than a dense retrieval (for example in a specific domain like healthcare) because a dense model needs to be trained on data. For more details about Hybrid Retrieval, check out [Blog Post: Hybrid Document Retrieval](https://haystack.deepset.ai/blog/hybrid-retrieval).

## Initializing the DocumentStore

You'll start creating your question answering system by initializing a DocumentStore. A DocumentStore stores the Documents that your system uses to find answers to your questions. In this tutorial, you'll be using the [`InMemoryDocumentStore`](https://docs.haystack.deepset.ai/docs/inmemorydocumentstore).

In [None]:
from haystack.document_stores.in_memory import InMemoryDocumentStore

document_store = InMemoryDocumentStore()

> `InMemoryDocumentStore` is the simplest DocumentStore to get started with. It requires no external dependencies and it's a good option for smaller projects and debugging. But it doesn't scale up so well to larger Document collections, so it's not a good choice for production systems. To learn more about the different types of external databases that Haystack supports, see [DocumentStore Integrations](https://haystack.deepset.ai/integrations?type=Document+Store&version=2.0).

## Fetching and Processing Documents

For searching, use the *sentence* feature (column). The other features (columns) will be stored as metadata for [metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering).

In [None]:
docs = []

for doc in dataset:
    docs.append(
        Document(content=doc["sentence"], meta={"sentiment": doc["sentiment"]})
    )
docs = docs[:7]
docs

## Indexing Documents with a Pipeline

Create a pipeline to store the data in the document store with their embedding. For this pipeline, you need a [DocumentSplitter](https://docs.haystack.deepset.ai/docs/documentsplitter) to split documents into chunks of 512 words, [SentenceTransformersDocumentEmbedder](https://docs.haystack.deepset.ai/docs/sentencetransformersdocumentembedder) to create document embeddings for dense retrieval and [DocumentWriter](https://docs.haystack.deepset.ai/docs/documentwriter) to write documents to the document store.

As an embedding model, you will use [BAAI/bge-small-en-v1.5](https://huggingface.co/BAAI/bge-small-en-v1.5) on Hugging Face. Feel free to test other models on Hugging Face or use another [Embedder](https://docs.haystack.deepset.ai/docs/embedders) to switch the model provider.

> If this step takes too long for you, replace the embedding model with a smaller model such as `sentence-transformers/all-MiniLM-L6-v2` or `sentence-transformers/all-mpnet-base-v2`. Make sure that the `split_length` is updated according to your model's token limit.

In [None]:
from haystack.components.writers import DocumentWriter
from haystack.components.embedders import SentenceTransformersDocumentEmbedder
from haystack.components.preprocessors.document_splitter import DocumentSplitter
from haystack import Pipeline
from haystack.utils import ComponentDevice

document_splitter = DocumentSplitter(split_by="word", split_length=512, split_overlap=32)
document_embedder = SentenceTransformersDocumentEmbedder(
    model="BAAI/bge-small-en-v1.5"
)
document_writer = DocumentWriter(document_store)

indexing_pipeline = Pipeline()
indexing_pipeline.add_component("document_splitter", document_splitter) # sentences/documents alone
indexing_pipeline.add_component("document_embedder", document_embedder) # embeddings per document
indexing_pipeline.add_component("document_writer", document_writer) # documents -> vector store

indexing_pipeline.connect("document_splitter", "document_embedder") # map documents : embeddings
indexing_pipeline.connect("document_embedder", "document_writer") # embeddings : vector store

indexing_pipeline.run({"document_splitter": {"documents": docs}})

Documents are stored in `InMemoryDocumentStore` with their embeddings, now it's time for creating the hybrid retrieval pipeline âœ…

## Creating a Pipeline for Hybrid Retrieval

Hybrid retrieval refers to the combination of multiple retrieval methods to enhance overall performance. In the context of search systems, a hybrid retrieval pipeline executes both traditional keyword-based search and dense vector search, later ranking the results with a **cross-encoder model**. This combination allows the search system to leverage the strengths of different approaches, providing more accurate and diverse results.

Here are the required steps for a hybrid retrieval pipeline:

### 1) Initialize Retrievers and the Embedder

Initialize a [InMemoryEmbeddingRetriever](https://docs.haystack.deepset.ai/docs/inmemoryembeddingretriever) and [InMemoryBM25Retriever](https://docs.haystack.deepset.ai/docs/inmemorybm25retriever) to perform both dense and keyword-based retrieval. For dense retrieval, you also need a [SentenceTransformersTextEmbedder](https://docs.haystack.deepset.ai/docs/sentencetransformerstextembedder) that computes the embedding of the search query by using the same embedding model `BAAI/bge-small-en-v1.5` that was used in the indexing pipeline:

In [None]:
from haystack.components.retrievers.in_memory import InMemoryBM25Retriever, InMemoryEmbeddingRetriever
from haystack.components.embedders import SentenceTransformersTextEmbedder

text_embedder = SentenceTransformersTextEmbedder(
    model="BAAI/bge-small-en-v1.5"
)
embedding_retriever = InMemoryEmbeddingRetriever(document_store)
bm25_retriever = InMemoryBM25Retriever(document_store)

### 2) Join Retrieval Results

Haystack offers several joining methods in [`DocumentJoiner`](https://docs.haystack.deepset.ai/docs/documentjoiner) to be used for different use cases such as `merge` and `reciprocal_rank_fusion`. In this example, you will use the default `concatenate` mode to join the documents coming from two Retrievers as the [Ranker](https://docs.haystack.deepset.ai/docs/rankers) will be the main component to rank the documents for relevancy.

In [None]:
from haystack.components.joiners import DocumentJoiner

document_joiner = DocumentJoiner()

### 3) Rank the Results

Use the [TransformersSimilarityRanker](https://docs.haystack.deepset.ai/docs/transformerssimilarityranker) that scores the relevancy of all retrieved documents for the given search query by using a cross encoder model. In this example, you will use [BAAI/bge-reranker-base](https://huggingface.co/BAAI/bge-reranker-base) model to rank the retrieved documents but you can replace this model with other cross-encoder models on Hugging Face.

In [None]:
from haystack.components.rankers import TransformersSimilarityRanker

ranker = TransformersSimilarityRanker(model="BAAI/bge-reranker-base")

### 4) Create the Hybrid Retrieval Pipeline

Add all initialized components to your pipeline and connect them.

In [None]:
from haystack import Pipeline

hybrid_retrieval = Pipeline()
hybrid_retrieval.add_component("text_embedder", text_embedder)
hybrid_retrieval.add_component("embedding_retriever", embedding_retriever)
hybrid_retrieval.add_component("bm25_retriever", bm25_retriever)
hybrid_retrieval.add_component("document_joiner", document_joiner)
hybrid_retrieval.add_component("ranker", ranker)

hybrid_retrieval.connect("text_embedder", "embedding_retriever")
hybrid_retrieval.connect("bm25_retriever", "document_joiner")
hybrid_retrieval.connect("embedding_retriever", "document_joiner")
hybrid_retrieval.connect("document_joiner", "ranker")

### 5) Visualize the Pipeline (Optional)

To understand how you formed a hybrid retrieval pipeline, use [draw()](https://docs.haystack.deepset.ai/docs/drawing-pipeline-graphs) method of the pipeline. If you're running this notebook on Google Colab, the generate file will be saved in "Files" section on the sidebar.

In [None]:
# save_img_name = "hybrid-retrieval.png"
# save_image_path = os.path.join(notebook_dir, "../data/hybrid_retrieval/", save_img_name)
# hybrid_retrieval.draw(path=save_image_path)

## Testing the Hybrid Retrieval

Pass the query to `text_embedder`, `bm25_retriever` and `ranker` and run the retrieval pipeline:


In [None]:
prediction_properties = DataProcessing.load_prediction_properties()
prediction_properties

In [None]:
future_verbs = [
    "will",
    "shall",
    "would",
    "going",
    "might",
    "should",
    "could",
    "may",
    "must",
    "can"
]
query = f"""Can you identify the predictions from the documents? I define a prediction as: {prediction_properties}. Note that the documents should be future tense like {future_verbs}.
    
    Some examples of predictions in the PhraseBank dataset are \n
        1. According to Gran , the company has no plans to move all production to Russia , although that is where the company is growing . \n
        2. According to the company 's updated strategy for the years 2009-2012 , Basware targets a long-term net sales growth in the range of 20 % -40 % with an operating profit margin of 10 % -20 % of net sales .
        3. Its board of directors will propose a dividend of EUR0 .12 per share for 2010 , up from the EUR0 .08 per share paid in 2009 .
    Some examples of non-predictions in the P
        1. Net sales increased to EUR193 .3 m from EUR179 .9 m and pretax profit rose by 34.2 % to EUR43 .1 m. ( EUR1 = USD1 .4 )
        2. Net sales surged by 18.5 % to EUR167 .8 m. Teleste said that EUR20 .4 m , or 12.2 % , of the sales came from the acquisitions made in 2009 .
        3. STORA ENSO , NORSKE SKOG , M-REAL , UPM-KYMMENE Credit Suisse First Boston ( CFSB ) raised the fair value for shares in four of the largest Nordic forestry groups .
"""

retrieved_result = hybrid_retrieval.run(
    {"text_embedder": {"text": query}, "bm25_retriever": {"query": query}, "ranker": {"query": query}}
)

In [None]:
retrieved_result["ranker"]["documents"]
retrieved_result_df = pd.DataFrame(retrieved_result["ranker"]["documents"])
retrieved_result_df['query'] = query
retrieved_result_df

## Save Data

- **Problems:**
    1. Multiple files with distinct retrieved results
    2. Same file where we duplicate retrieved results. Duplicate as in first run: 0 to 7, second run 7 to 14, but it includes 0 to 7, so file would be 0 to 7, 0 to 7, then 7 to 14.

- `new_file = True:` Only when it's the first usage.
- `new_file = False:` Only when it's after the first usage. Reasoning is: the next step of LLM labeling (prediction/non-prediction) will use this saved file and store results in same file. 

In [None]:
if new_file = True:
    path = os.path.join(notebook_dir, "../data/rag/retrieved/")
    prefix = "retrieved_results"
    save_file_type = "csv"
    DataProcessing.save_to_file(retrieved_result_df, path, prefix, save_file_type)
else:
    """If append to previous file, then save:"""
    path = os.path.join(notebook_dir, "../data/rag/retrieved/", "text_label_name_meta_data-v1.csv")
    # prefix = "text_label_metadata"
    save_file_type = "csv"
    df = DataProcessing.load_from_file(path, save_file_type)
    df.drop(['Unnamed: 0'], axis=1, inplace=True)
    df

In [None]:
content = retrieved_result_df.content
content

In [None]:
meta_data_df = retrieved_result_df.drop(['content'], axis=1)
meta_data_df.head(1)

In [None]:
new_df = pd.DataFrame(content)
new_df

In [None]:
meta_datas = []
for idx, row in meta_data_df.iterrows():
    meta_datas.append(row)

meta_datas

In [None]:
new_df['meta_data'] = meta_datas
new_df.rename(columns={'content': 'text'}, inplace=True)
new_df

In [None]:
updated_df = pd.concat([df, new_df])
updated_df

In [None]:
path = os.path.join(notebook_dir, "../data/rag/retrieved/")
prefix = "text_label_name_meta_data"
save_file_type = "csv"
DataProcessing.save_to_file(updated_df, path, prefix, save_file_type)

### Pretty Print the Results
Create a function to print a kind of *search page*.

In [None]:
def pretty_print_results(prediction):
    for doc in prediction["documents"]:
        print(doc.content, "\t", doc.score)
        # print(doc.meta["abstract"])
        print("\n", "\n")

In [None]:
pretty_print_results(result["ranker"])

## What's next

ðŸŽ‰ Congratulations! You've create a hybrid retrieval pipeline!

If you'd like to use this retrieval method in a RAG pipeline, check out [Tutorial: Creating Your First QA Pipeline with Retrieval-Augmentation](https://haystack.deepset.ai/tutorials/27_first_rag_pipeline) to learn about the next steps.

To stay up to date on the latest Haystack developments, you can [sign up for our newsletter](https://landing.deepset.ai/haystack-community-updates) or [join Haystack discord community](https://discord.gg/haystack).

Thanks for reading!