In [None]:
# Deep dive with haystack pipelines
from haystack import Pipeline
from haystack.utils import launch_es
from haystack.document_stores import ElasticsearchDocumentStore
from haystack.nodes import BM25Retriever, EmbeddingRetriever, FARMReader
from haystack.utils import (
    print_answers,
    print_documents,
    fetch_archive_from_http,
    convert_files_to_docs,
    clean_wiki_text,
)

In [None]:
# Download and prepare data - 517 Wikipedia articles for Game of Thrones
doc_dir = "/mnt/sda/haystack/pipeline_deep_dive_data"
s3_url = "https://s3.eu-central-1.amazonaws.com/deepset.ai-farm-qa/datasets/documents/wiki_gameofthrones_txt11.zip"
fetch_archive_from_http(url=s3_url, output_dir=doc_dir)

# convert files to dicts containing documents that can be indexed to our datastore
got_docs = convert_files_to_docs(dir_path=doc_dir, clean_func=clean_wiki_text, split_paragraphs=True)

In [None]:
# Initialize DocumentStore and index documents
document_store = ElasticsearchDocumentStore(host = "localhost", port = 9200, username="elastic", 
    password="cMYVjbUMj_8_664gC6R8", index = "document", scheme = "https", verify_certs = True,
    ca_certs = "/home/tanvir/work/qa-experiment/http_ca.crt")
document_store.delete_documents()
document_store.write_documents(got_docs)

# Initialize Sparse retriever
bm25_retriever = BM25Retriever(document_store = document_store)

# Initialize dense retriever
embedding_retriever = EmbeddingRetriever(
    document_store,
    model_format = "sentence_transformers",
    embedding_model = "sentence-transformers/multi-qa-mpnet-base-dot-v1",
)
document_store.update_embeddings(embedding_retriever, update_existing_embeddings=False)

# Initialize reader
reader = FARMReader(model_name_or_path="deepset/roberta-base-squad2")

In [None]:
# Haystack features many prebuilt pipelines that cover common tasks. Here we have an ExtractiveQAPipeline
from haystack.pipelines import ExtractiveQAPipeline
from haystack.pipelines import GenerativeQAPipeline, FAQPipeline
from haystack.nodes import RAGenerator

# Prebuilt pipeline
p_extractive_premade = ExtractiveQAPipeline(reader = reader, retriever = bm25_retriever)
res = p_extractive_premade.run(
    query = "Who is the father of Arya Stark?", params = {"Retriever": {"top_k": 10}, "Reader": {"top_k": 5}}
)
print_answers(res, details = "minimum")

In [None]:
# If you want to just do the retrieval step, you can use a DocumentSearchPipeline
from haystack.pipelines import DocumentSearchPipeline
p_retrieval = DocumentSearchPipeline(bm25_retriever)
res = p_retrieval.run(query = "Who is the father of Arya Stark?", params = {"Retriever": {"top_k": 10}})
print_documents(res, max_text_len = 200)

In [None]:
# Or if you want to use a Generator instead of a Reader, you can initialize a GenerativeQAPipeline like this
from haystack.pipelines import GenerativeQAPipeline, FAQPipeline
from haystack.nodes import RAGenerator

# We set this to True so that the document store returns document embeddings with each document
# This is needed by the Generator
document_store.return_embedding = True

# Initialize generator
rag_generator = RAGenerator()

# Generative QA
p_generator = GenerativeQAPipeline(generator = rag_generator, retriever = embedding_retriever)
res = p_generator.run(query = "Who is the father of Arya Stark?", params = {"Retriever": {"top_k": 10}})
print_answers(res, details = "minimum")

# We are setting this to False so that in later pipelines,
# we get a cleaner printout
document_store.return_embedding = False

In [None]:
# Now we are going to rebuild the ExtractiveQAPipelines using the generic Pipeline class.
# We do this by adding the building blocks that we initialized as nodes in the graph.

# Custom built extractive QA pipeline
p_extractive = Pipeline()
p_extractive.add_node(component = bm25_retriever, name = "Retriever", inputs = ["Query"])
p_extractive.add_node(component = reader, name = "Reader", inputs = ["Retriever"])

# Now we can run it
res = p_extractive.run(query = "Who is the father of Arya Stark?", params = {"Retriever": {"top_k": 10}, "Reader": {"top_k": 5}})
print_answers(res, details = "minimum")

In [None]:
# Pipelines offer a very simple way to ensemble together different components. In this example, we are going to combine the power of an EmbeddingRetriever
# with the keyword based BM25Retriever. Here we use a JoinDocuments node so that the predictions from each retriever can be merged together.

from haystack.nodes import JoinDocuments

# Create ensembled pipeline
p_ensemble = Pipeline()
p_ensemble.add_node(component = bm25_retriever, name = "ESRetriever", inputs = ["Query"])
p_ensemble.add_node(component = embedding_retriever, name = "EmbeddingRetriever", inputs = ["Query"])
p_ensemble.add_node(component = JoinDocuments(join_mode = "concatenate"), name = "JoinResults", inputs = ["ESRetriever", "EmbeddingRetriever"])
p_ensemble.add_node(component = reader, name = "Reader", inputs = ["JoinResults"])

# Run pipeline
res = p_ensemble.run(query = "Who is the father of Arya Stark?", params = {"EmbeddingRetriever": {"top_k": 5}, "ESRetriever": {"top_k": 5}})
print_answers(res, details = "minimum")

In [None]:
# Lets see how we can write a custome node and add it to the pipeline. One good example is "Decision Nodes".
# Decision Nodes help you route your data so that only certain branches of your Pipeline are run.
# One popular use case for such query classifiers is routing keyword queries to Elasticsearch and questions to EmbeddingRetriever + Reader.
# With this approach you keep optimal speed and simplicity for keywords while going deep with transformers when it's most helpful.

from haystack import BaseComponent
from typing import Optional

# Always extend BaseComponent when writing custom nodes.
class CustomQueryClassifier(BaseComponent):
    outgoing_edges = 2
    def run(self, query: str):
        if "?" in query:
            return {}, "output_2"
        else:
            return {}, "output_1"


# Here we build the pipeline
p_classifier = Pipeline()
p_classifier.add_node(component = CustomQueryClassifier(), name = "QueryClassifier", inputs = ["Query"])
p_classifier.add_node(component = bm25_retriever, name = "ESRetriever", inputs = ["QueryClassifier.output_1"])
p_classifier.add_node(component = embedding_retriever, name = "EmbeddingRetriever", inputs = ["QueryClassifier.output_2"])
p_classifier.add_node(component = reader, name = "QAReader", inputs = ["ESRetriever", "EmbeddingRetriever"])

# Run only the dense retriever on the full sentence query
res_1 = p_classifier.run(query="Who is the father of Arya Stark?")
print("Embedding Retriever Results" + "\n" + "=" * 15)
print_answers(res_1)

# Run only the sparse retriever on a keyword based query
res_2 = p_classifier.run(query="Arya Stark father")
print("ES Results" + "\n" + "=" * 15)
print_answers(res_2)