# Deep Dive into Question Answering Engine with Towhee

In the [previous tutorial](./1_build_question_answering_engine.ipynb), we built and prototyped a proof-of-concept question answering engine. Now, let's feed it with large-scale image datasets, and deploy it as a micro-service with Towhee.

## Preparation

### Install Dependencies

First we need to install dependencies such as pymilvus, towhee and fastapi.

In [1]:
! python -m pip -q install pymilvus towhee fastapi

### Prepare the data

There is a subset of the  [InsuranceQA Corpus](https://github.com/shuzi/insuranceQA)  (1000 pairs of questions and answers) used in this demo, everyone can download on [Github](https://github.com/towhee-io/examples/releases/download/data/question_answer.csv). The dataset is same as our previous tutorial: "[Build a Question Answer Engine in Minutes](1_build_question_answering_engine.ipynb)", and to make things easy, we'll repeat the important code blocks below; if you have already downloaded data, please move on to next section.

- question_answer.csv: a file containing question and the answer.

In [2]:
! curl -L https://github.com/towhee-io/examples/releases/download/data/question_answer.csv -O

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0  0      0      0 --:--:-- --:--:-- --:--:--     0
100  595k  100  595k    0     0   215k      0  0:00:02  0:00:02 --:--:--  437k


To use the dataset to get answers, let's first define the dictionary:

- id_answer: a dictionary of id and corresponding answer

In [3]:
import pandas as pd

df = pd.read_csv('question_answer.csv')
id_answer = df.set_index('id')['answer'].to_dict()

### Create Milvus Collection

Before getting started, please make sure you have [installed milvus](https://milvus.io/docs/v2.0.x/install_standalone-docker.md). Next to define the function `create_milvus_collection` to create collection in Milvus that uses the [L2 distance metric](https://milvus.io/docs/v2.0.x/metric.md#Euclidean-distance-L2) and an [IVF_FLAT index](https://milvus.io/docs/v2.0.x/index.md#IVF_FLAT).

In [4]:
from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection, utility

connections.connect(host='127.0.0.1', port='19530')

def create_milvus_collection(collection_name, dim):
    if utility.has_collection(collection_name):
        utility.drop_collection(collection_name)
    
    fields = [
    FieldSchema(name='id', dtype=DataType.INT64, descrition='ids', is_primary=True, auto_id=False),
    FieldSchema(name='embedding', dtype=DataType.FLOAT_VECTOR, descrition='embedding vectors', dim=dim)
    ]
    schema = CollectionSchema(fields=fields, description='reverse image search')
    collection = Collection(name=collection_name, schema=schema)

    # create IVF_FLAT index for collection.
    index_params = {
        'metric_type':'L2',
        'index_type':"IVF_FLAT",
        'params':{"nlist":2048}
    }
    collection.create_index(field_name="embedding", index_params=index_params)
    return collection

## Making Our Question Answering Engine Production Ready

To put the question answering engine into production, we need to feed it with a large-scale dataset and deploy a microservice to accept incoming queries.

### Improve Performance with Parallel Execution

We are able to enable parallel execution by simply calling `set_parallel` within the pipeline. It tells towhee to process the data in parallel. Here is an example that enables parallel execution on a pipeline using dpr operator. It can be seen that the execution speed below is nearly two times faster than before.

In [5]:
import towhee
import time

collection = create_milvus_collection('qa', 768)

t1 = time.time()
dc = (
    towhee.read_csv('question_answer.csv')
      .runas_op['id', 'id'](func=lambda x: int(x))
      .text_embedding.dpr['question', 'vec'](model_name="facebook/dpr-ctx_encoder-single-nq-base")
      .runas_op['vec', 'vec'](func=lambda x: x.squeeze(0))
      .to_milvus['id', 'vec'](collection=collection, batch=25)
)
t2 = time.time()

print("Total time: ", t2-t1)

Total time:  101.92725419998169


In [6]:
collection_parallel = create_milvus_collection('qa_parallel', 768)

t1 = time.time()
dc = (
    towhee.read_csv('question_answer.csv')
      .runas_op['id', 'id'](func=lambda x: int(x))
      .set_parallel(2)
      .text_embedding.dpr['question', 'vec'](model_name="facebook/dpr-ctx_encoder-single-nq-base")
      .runas_op['vec', 'vec'](func=lambda x: x.squeeze(0))
      .to_milvus['id', 'vec'](collection=collection_parallel, batch=25)
)
t2 = time.time()
print("Total time with parallel:", t2-t1)

Total time with parallel: 58.73357009887695


### Exception Safe Execution

When we have large-scale data, there may be bad data that will cause errors. Typically, we don't want such errors to break the production system. Therefore, the data pipeline should continue to process the rest of the data and report the errors.

Towhee supports an exception-safe execution mode that allows the pipeline to continue on exceptions and represent the exceptions with `Empty` values. And user can choose how to deal with the `Empty` values at the end of the pipeline. During the query below, there is a `None` data, and it just prints an error message instead of terminating because it has `exception_safe` and `drop_empty`, as you can see, `drop_empty` deletes `empty` data.

In [7]:
dc = ( towhee.dc(['Is  Disability  Insurance  Required  By  Law?', None])
      .exception_safe()
      .text_embedding.dpr(model_name="facebook/dpr-ctx_encoder-single-nq-base")
      .runas_op(func=lambda x: x.squeeze(0))
      .milvus_search(collection='question_answer', limit=3)
      .runas_op(func=lambda res: [id_answer[x.id] for x in res])
      .to_list()
)

2022-05-31 14:53:14,397 - 4554221056 - dpr.py-dpr:46 - ERROR: Invalid input for the tokenizer: facebook/dpr-ctx_encoder-single-nq-base


## Deploy as a Microservice

The data pipeline used in our experiments can be converted to a function with `towhee.api` and `as_function()`, as it is presented in the [previous tutorial](./1_build_question_answering_engine.ipynb). We can also convert the data pipeline into a RESTful API with `serve()`, it generates FastAPI services from towhee pipelines.

### Insert Image Data

In [8]:
import time
import json
import towhee
from fastapi import FastAPI
from pymilvus import connections, Collection

app = FastAPI()
connections.connect(host='127.0.0.1', port='19530')
milvus_collection = Collection('qa')

@towhee.register(name='get_qa_id')
def get_qa_id(text):
    qa = json.loads(text)
    question = qa['Q']
    answer = qa['A']
    timestamp = int(time.time()*10000)
    id_answer[timestamp] = answer
    return question, timestamp

@towhee.register(name='milvus_insert')
class MilvusInsert:
    def __init__(self, collection):
        self.collection = collection

    def __call__(self, *args, **kwargs):
        data = []
        for iterable in args:
            data.append([iterable])
        mr = self.collection.insert(data)
        self.collection.load()
        return str(mr)

with towhee.api['text']() as api:
    app_insert = (
        api.get_qa_id['text', ('question', 'id')]()
        .text_embedding.dpr['question', 'vec'](model_name="facebook/dpr-ctx_encoder-single-nq-base")
        .runas_op['vec', 'vec'](func=lambda x: x.squeeze(0))
        .tensor_normalize['vec', 'vec']()
        .milvus_insert[('id', 'vec'), 'res'](collection=milvus_collection)
        .select['id']()
        .serve('/insert', app)
    )

### Search Similar Image

In [9]:
with towhee.api() as api:
    app_search = (
        api.text_embedding.dpr(model_name="facebook/dpr-ctx_encoder-single-nq-base")
          .runas_op(func=lambda x: x.squeeze(0))
          .tensor_normalize()
          .milvus_search(collection=milvus_collection, limit=1)
          .runas_op(func=lambda res: [id_answer[x.id] for x in res])
          .serve('/search', app)
)

### Count Numbers

In [10]:
with towhee.api() as api:
    app_count = (
        api.map(lambda _: milvus_collection.num_entities)
        .serve('/count', app)
        )

### Start Server

Finally to start FastAPI, there are three services `/insert`, `/search` and `/count`, you can run the following commands to test:

> Note that insert data should contain both question and answer, e.g. '{"Q": "The question...?", "A": "The answer..."}'

```bash
# ask a question
$ curl -X POST "http://0.0.0.0:8000/search"  --data "Is  Disability  Insurance  Required  By  Law?"

# insert qa data
$ curl -X POST "http://0.0.0.0:8000/insert"  --data '{"Q": "What is China RMB rate?", "A": "1.00 CNY->0.149286US Dollar, 6.698573 CNY-> 1.00 US Dollar and 1.00 CNY->0.139041 Euro, 7.192143 CNY -> 1.00 Euro on May 30, 2022."}'

# count the collection
$ curl -X POST "http://0.0.0.0:8000/count"
```

In [11]:
import uvicorn
import nest_asyncio

nest_asyncio.apply()
uvicorn.run(app=app, host='0.0.0.0', port=8000)

INFO:     Started server process [89586]
2022-05-31 14:53:50,423 - 4554221056 - server.py-server:64 - INFO: Started server process [89586]
INFO:     Waiting for application startup.
2022-05-31 14:53:50,426 - 4554221056 - on.py-on:26 - INFO: Waiting for application startup.
INFO:     Application startup complete.
2022-05-31 14:53:50,431 - 4554221056 - on.py-on:38 - INFO: Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)
2022-05-31 14:53:50,437 - 4554221056 - server.py-server:199 - INFO: Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)


INFO:     127.0.0.1:58069 - "POST /search HTTP/1.1" 200 OK
INFO:     127.0.0.1:58070 - "POST /insert HTTP/1.1" 200 OK
INFO:     127.0.0.1:58071 - "POST /count HTTP/1.1" 200 OK


INFO:     Shutting down
2022-05-31 14:54:27,530 - 4554221056 - server.py-server:239 - INFO: Shutting down
INFO:     Waiting for application shutdown.
2022-05-31 14:54:27,634 - 4554221056 - on.py-on:43 - INFO: Waiting for application shutdown.
INFO:     Application shutdown complete.
2022-05-31 14:54:27,638 - 4554221056 - on.py-on:46 - INFO: Application shutdown complete.
INFO:     Finished server process [89586]
2022-05-31 14:54:27,641 - 4554221056 - server.py-server:74 - INFO: Finished server process [89586]
