# Dive Deep Text-Video Retrieval Engine

In the [previous tutorial](./1_text_video_retrieval_engine.ipynb), we've learnt how to build a text-video retrieval engine. Now let's make the solution more feasible in production.


## Preparation  

Let's recall preparation steps first:
1. Install packages
2. Prepare data
3. Start milvus

### Install packages

Make sure you have installed required python packages:

| package |
| -- |
| pymilvus |
| towhee |
| towhee.models |
| pillow |
| ipython |
| fastapi |

In [1]:
# ! python -m pip install -q pymilvus towhee towhee.models pillow ipython fastapi

### Prepare the data

First, we need to prepare the dataset and Milvus environment.   

[MSR-VTT (Microsoft Research Video to Text)](https://www.microsoft.com/en-us/research/publication/msr-vtt-a-large-video-description-dataset-for-bridging-video-and-language/) is a dataset for the open domain video captioning, which consists of 10,000 video clips.  

Download the MSR-VTT-1kA test set from [google drive](https://drive.google.com/file/d/1cuFpHiK3jV9cZDKcuGienxTg1YQeDs-w/view?usp=sharing) and unzip it, which contains just 1k videos.  
And the video captions text sentence information is in ./MSRVTT_JSFUSION_test.csv.

The data is organized as follows:
- **test_1k_compress:** 1k compressed test videos in MSR-VTT-1kA.
- **MSRVTT_JSFUSION_test.csv:** a csv file containing an ***key,vid_key,video_id,sentence***, for each video and caption text.

Let's take a quick look

In [2]:
# ! curl -L https://github.com/towhee-io/examples/releases/download/data/text_video_search.zip -O
# ! unzip -q -o text_video_search.zip

In [3]:
import pandas as pd
import os

# raw_video_path = './test_1k_compress' # 1k test video path.
raw_video_path = os.path.join(os.path.abspath('.'), './test_1k_compress')

test_csv_path = './MSRVTT_JSFUSION_test.csv' # 1k video caption csv.

test_sample_csv_path = './MSRVTT_JSFUSION_test_sample.csv'

sample_num = 1000 # you can change this sample_num to be smaller, so that this notebook will be faster.
test_df = pd.read_csv(test_csv_path)
print('length of all test set is {}'.format(len(test_df)))
sample_df = test_df.sample(sample_num, random_state=42)

sample_df['video_path'] = sample_df.apply(lambda x:os.path.join(raw_video_path, x['video_id']) + '.mp4', axis=1)

sample_df.to_csv(test_sample_csv_path)
print('random sample {} examples'.format(sample_num))

df = pd.read_csv(test_sample_csv_path)

length of all test set is 1000
random sample 1000 examples


Define some helper function to convert video to gif so that we can have a look at these video-text pairs.   

Take a look at the ground-truth video-text pairs.

### Start Milvus

Before getting started, please make sure you have [installed milvus](https://milvus.io/docs/v2.0.x/install_standalone-docker.md). Let's first create a `video retrieval` collection that uses the [L2 distance metric](https://milvus.io/docs/v2.0.x/metric.md#Euclidean-distance-L2) and an [IVF_FLAT index](https://milvus.io/docs/v2.0.x/index.md#IVF_FLAT).

In [4]:
from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection, utility

connections.connect(host='127.0.0.1', port='19530')

def create_milvus_collection(collection_name, dim):
    if utility.has_collection(collection_name):
        utility.drop_collection(collection_name)
    
    fields = [
    FieldSchema(name='id', dtype=DataType.INT64, descrition='ids', is_primary=True, auto_id=False),
    FieldSchema(name='embedding', dtype=DataType.FLOAT_VECTOR, descrition='embedding vectors', dim=dim)
    ]
    schema = CollectionSchema(fields=fields, description='video retrieval')
    collection = Collection(name=collection_name, schema=schema)

    # create IVF_FLAT index for collection.
    index_params = {
        'metric_type':'L2', #IP
        'index_type':"IVF_FLAT",
        'params':{"nlist":2048}
    }
    collection.create_index(field_name="embedding", index_params=index_params)
    return collection

## Parallel Execution

We are able to enable parallel execution by simply calling `set_parallel` within the pipeline. It tells Towhee to process the data in parallel. The code below enables parallel execution on the above example. It shows that the execution speeds up by 13% for a data size of 1000 videos(1min 9s in this notebook vs 1min 19s in [previous tutorial](./1_text_video_retrieval_engine.ipynb)) . 

In [5]:
%%time
import os
import towhee
import numpy as np

device = 'cuda:2'
# device = 'cpu'

collection = create_milvus_collection('text_video_retrieval', 512)

dc = (
    towhee.read_csv(test_sample_csv_path)
      .runas_op['video_id', 'id'](func=lambda x: int(x[-4:]))
      .set_parallel(2)
      .video_decode.ffmpeg['video_path', 'frames'](sample_type='uniform_temporal_subsample', args={'num_samples': 12}) \
      .runas_op['frames', 'frames'](func=lambda x: [y for y in x]) \
      .video_text_embedding.clip4clip['frames', 'vec'](model_name='clip_vit_b32', modality='video', device=device) \
      .to_milvus['id', 'vec'](collection=collection, batch=30)
)

CPU times: user 28min 59s, sys: 1h 30min 3s, total: 1h 59min 3s
Wall time: 1min 9s


## Exception Safe

Let's build an `exception_df` with a exception value, you can change the video_path to an error path, which will cause a exception in normal.

In [6]:
exception_df = pd.read_csv(test_sample_csv_path)
exception_df.loc[0, 'video_path'] = 666
print(type(exception_df.loc[0, 'video_path']))
exception_csv_path = './MSRVTT_JSFUSION_test_exception.csv'
exception_df.to_csv(exception_csv_path)
exception_df.head()

<class 'int'>


Unnamed: 0.1,Unnamed: 0,key,vid_key,video_id,sentence,video_path
0,521,ret521,msr7579,video7579,a girl wearing red top and black trouser is pu...,666
1,737,ret737,msr7725,video7725,young people sit around the edges of a room cl...,/home/data1/zhangchen_workspace/video-retrieva...
2,740,ret740,msr9258,video9258,a person is using a phone,/home/data1/zhangchen_workspace/video-retrieva...
3,660,ret660,msr7365,video7365,cartoon people are eating at a restaurant,/home/data1/zhangchen_workspace/video-retrieva...
4,411,ret411,msr8068,video8068,a woman on a couch talks to a a man,/home/data1/zhangchen_workspace/video-retrieva...


Similarly, when we have large-scale data, there may be some bad data that will cause errors. Typically, the users don't want such errors to break the system in production. Therefore, the pipeline should continue to process the rest of the videos and report broken ones.

Towhee supports an `exception-safe` execution mode that allows the pipeline to continue on exceptions and represent the exceptions with `Empty` values. The user can choose how to deal with the empty values at the end of the pipeline. During the query below, there are 4 files in total under the `exception` folder, one of them is broken. With `exception-safe`, it will print the ERROR but NOT terminate the process. As you can see from results, `drop_empty` deletes empty data.

In [7]:
%%time
import os
import towhee
import numpy as np

device = 'cuda:2'
# device = 'cpu'

collection = create_milvus_collection('text_video_retrieval', 512)

dc = (
    towhee.read_csv(exception_csv_path)
      .exception_safe()
      .runas_op['video_id', 'id'](func=lambda x: int(x[-4:]))
      .video_decode.ffmpeg['video_path', 'frames'](sample_type='uniform_temporal_subsample', args={'num_samples': 12}) \
      .runas_op['frames', 'frames'](func=lambda x: [y for y in x]) \
      .video_text_embedding.clip4clip['frames', 'vec'](model_name='clip_vit_b32', modality='video', device=device) \
      .drop_empty()
      .to_milvus['id', 'vec'](collection=collection, batch=30)
)

CPU times: user 17min 36s, sys: 50min 1s, total: 1h 7min 37s
Wall time: 1min 7s


In [10]:
print('Total number of inserted data is {}, while sample_num is {}. The exception data is dropped.'.format(collection.num_entities, sample_num))

Total number of inserted data is 999, while sample_num is 1000. The exception data is dropped.
