## Preparation 制备

### Install packages 安装软件包

In [None]:
! python -m pip install -q towhee==1.1.3 towhee.models==1.1.3 pillow==10.4.0 ipython==8.26.0 gradio==4.41.0

### Prepare the data 准备数据

首先，我们需要准备数据集和 Milvus 环境。

MSR-VTT（Microsoft Research 视频到文本）是开放域视频字幕的数据集，由 10,000 个视频剪辑组成。
从 google drive 下载 MSR-VTT-1kA 测试集并解压缩，其中仅包含 1k 视频。
视频字幕文本句子信息以 ./MSRVTT_JSFUSION_test.csv 为单位。

数据按如下方式组织：
-- test_1k_compress：MSR-VTT-1kA 格式的 1k 压缩测试视频。
-- MSRVTT_JSFUSION_test.csv：一个 csv 文件，其中包含每个视频和字幕文本的 key、vid_key、video_id、sentence。

让我们快速浏览一下


In [None]:
! curl -L https://github.com/towhee-io/examples/releases/download/data/text_video_search.zip -O
! unzip -q -o text_video_search.zip

In [1]:
import pandas as pd
import os

raw_video_path = './test_1k_compress' # 1k test video path.
test_csv_path = './MSRVTT_JSFUSION_test.csv' # 1k video caption csv.

test_sample_csv_path = './MSRVTT_JSFUSION_test_sample.csv'

sample_num = 1000 # you can change this sample_num to be smaller, so that this notebook will be faster.
test_df = pd.read_csv(test_csv_path)
print('length of all test set is {}'.format(len(test_df)))
sample_df = test_df.sample(sample_num, random_state=42)

sample_df['video_path'] = sample_df.apply(lambda x:os.path.join(raw_video_path, x['video_id']) + '.mp4', axis=1)

sample_df.to_csv(test_sample_csv_path)
print('random sample {} examples'.format(sample_num))

df = pd.read_csv(test_sample_csv_path)

df[['video_id', 'video_path', 'sentence']].head()

length of all test set is 1000
random sample 1000 examples


Unnamed: 0,video_id,video_path,sentence
0,video7579,./test_1k_compress\video7579.mp4,a girl wearing red top and black trouser is pu...
1,video7725,./test_1k_compress\video7725.mp4,young people sit around the edges of a room cl...
2,video9258,./test_1k_compress\video9258.mp4,a person is using a phone
3,video7365,./test_1k_compress\video7365.mp4,cartoon people are eating at a restaurant
4,video8068,./test_1k_compress\video8068.mp4,a woman on a couch talks to a a man


定义一些辅助函数将视频转换为 gif，以便我们可以查看这些视频文本对。

In [None]:
from IPython import display
from pathlib import Path
from towhee import pipe, ops
from PIL import Image

def display_gif(video_path_list, text_list):
    html = ''
    for video_path, text in zip(video_path_list, text_list):
        html_line = '<img src="{}"> {} <br/>'.format(video_path, text)
        html += html_line
    return display.HTML(html)

    
def convert_video2gif(video_path, output_gif_path, num_samples=16):
    p = (
        pipe.input('video_path')
        .map('video_path', 'video_frames', ops.video_decode.ffmpeg(sample_type='uniform_temporal_subsample', args={'num_samples': num_samples}))
        .output('video_frames')
    )
    frames = p(video_path).to_list()[0][0]
    imgs = [Image.fromarray(frame) for frame in frames]
    imgs[0].save(fp=output_gif_path, format='GIF', append_images=imgs[1:], save_all=True, loop=0)


def display_gifs_from_video(video_path_list, text_list, tmpdirname = './tmp_gifs'):
    Path(tmpdirname).mkdir(exist_ok=True)
    gif_path_list = []
    for video_path in video_path_list:
        video_name = str(Path(video_path).name).split('.')[0]
        gif_path = Path(tmpdirname) / (video_name + '.gif')
        convert_video2gif(video_path, gif_path)
        gif_path_list.append(gif_path)
    return display_gif(gif_path_list, text_list)

查看 ground-truth 视频-文本对。

In [4]:
# sample_show_df = sample_df.sample(5, random_state=42)
sample_show_df = sample_df[:5]
video_path_list = sample_show_df['video_path'].to_list()
text_list = sample_show_df['sentence'].to_list()
tmpdirname = './tmp_gifs'
display_gifs_from_video(video_path_list, text_list, tmpdirname=tmpdirname)

## 创建 Milvus Collection

在开始之前，请确保你已经启动了 Milvus 服务。此笔记本使用 milvus 2.2.10 和 pymilvus 2.2.11。

In [5]:
! python -m pip install -q pymilvus==2.2.11

让我们首先创建一个使用 L2 距离指标和 IVF_FLAT 索引的视频检索集合。

In [None]:
from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection, utility

connections.connect(host='127.0.0.1', port='19530')

def create_milvus_collection(collection_name, dim):
    if utility.has_collection(collection_name):
        utility.drop_collection(collection_name)
    
    fields = [
    FieldSchema(name='id', dtype=DataType.INT64, descrition='ids', is_primary=True, auto_id=False),
    FieldSchema(name='embedding', dtype=DataType.FLOAT_VECTOR, descrition='embedding vectors', dim=dim)
    ]
    schema = CollectionSchema(fields=fields, description='video retrieval')
    collection = Collection(name=collection_name, schema=schema)

    # create IVF_FLAT index for collection.
    index_params = {
        'metric_type':'L2', #IP
        'index_type':"IVF_FLAT",
        'params':{"nlist":2048}
    }
    collection.create_index(field_name="embedding", index_params=index_params)
    return collection

In [None]:
collection = create_milvus_collection('text_video_retrieval', 512)

## 文本-视频检索

在本节中，我们将展示如何使用 Milvus 构建我们的文本视频检索引擎。文本视频检索背后的基本思想是使用 Transformer 网络从视频中提取嵌入并存储在 Milvus 中，然后使用另一个 Transformer 网络获取文本嵌入并与存储在 Milvus 中的文本进行比较。

我们使用 Towhee，这是一个允许创建数据处理管道的机器学习框架。Towhee 还提供了预定义的运算符，用于在 Milvus 中实现 insert 和 query 操作。

### 将视频嵌入加载到 Milvus 中

我们首先使用 CLIP4Clip 模型从图像中提取嵌入向量，并将嵌入向量插入 Milvus 进行索引。Towhee 提供了一个方法链接式 API，以便用户可以将数据处理管道与运算符组装在一起。

CLIP4Clip 是一种基于 CLIP （ViT-B） 的视频文本检索模型。具有预训练权重的 towhee clip4clip 算子可以通过几段代码轻松提取视频嵌入和文本嵌入。

![base.png](base.png)

在开始运行 clip4clip 操作器之前，你应该确保你已经安装了 make git 和 git-lfs。

对于 git（如果你已经安装了，就跳过它）：
sudo apt-get install git

对于 git-lfs（必须安装它才能在后端下载 checkpoint 权重）：
sudo apt-get install git-lfs
git lfs install

In [None]:
%%time
import os
from towhee import ops, pipe, register
from towhee.operator import PyOperator


def read_csv(csv_file):
    import csv
    with open(csv_file, 'r', encoding='utf-8-sig') as f:
        data = csv.DictReader(f)
        for line in data:
            yield int(line['video_id'][len('video'):]), line['video_path']


dc = (
    pipe.input('csv_file')
    .flat_map('csv_file', ('video_id', 'video_path'), read_csv)
    .map('video_path', 'frames', ops.video_decode.ffmpeg(sample_type='uniform_temporal_subsample', args={'num_samples': 12}))
    .map('frames', 'vec', ops.video_text_embedding.clip4clip(model_name='clip_vit_b32', modality='video', device='cuda:1'))
    .map(('video_id', 'vec'), (), ops.ann_insert.milvus_client(host='127.0.0.1', port='19530', collection_name='text_video_retrieval'))
    .output('video_id')
)

In [None]:
dc(test_sample_csv_path)
collection.load()

In [None]:
print('Total number of inserted data is {}.'.format(collection.num_entities))

以下是代码每行的详细说明：
-- read_csv(test_sample_csv_path) ：从 CSV 文件中读取表格数据;
-- ops.video_decode.ffmpeg： 对视频进行统一子采样，然后得到视频中的图片列表，这些图片是 clip4clip 模型的输入;
-- ops.video_text_embedding.clip4clip(model_name='clip_vit_b32', modality='video') ：从视频中子采样的图像中提取嵌入特征，然后将它们汇集到时间维度中，这表示。
-- ops.ann_insert.milvus_client(host='127.0.0.1', port='19530', collection_name='text_video_retrieval') ：在 Milvus 中插入视频嵌入功能;

In [None]:
%%time

def read_csv(csv_file):
    import csv
    with open(csv_file, 'r', encoding='utf-8-sig') as f:
        data = csv.DictReader(f)
        for line in data:
            yield line['video_id'], line['sentence']

dc_search = (
    pipe.input('csv_file')
    .flat_map('csv_file', ('video_id', 'sentence'), read_csv)
    .map('sentence', 'vec', ops.video_text_embedding.clip4clip(model_name='clip_vit_b32', modality='text', device='cuda:1'))
    .map('vec', 'top10_raw_res', 
         ops.ann_search.milvus_client(
             host='127.0.0.1', port='19530', collection_name='text_video_retrieval', limit=10)
        )
    .map('top10_raw_res', ('top1', 'top5', 'top10'), lambda x: (x[:1], x[:5], x[:10]))
    .map('video_id', 'ground_truth', lambda x: x[len('video'):])
    .output('video_id', 'sentence', 'ground_truth', 'top1', 'top5', 'top10')
)

In [None]:
from towhee import DataCollection

ret = DataCollection(dc_search(test_sample_csv_path))
ret.show()

### Evaluation 评估
我们已经完成了文本视频检索引擎的核心功能。但是，我们不知道它是否实现了合理的性能。我们需要根据 Ground Truth 评估检索引擎，以便我们知道是否有任何改进的余地。
在本节中，我们将使用 recall@topk 评估文本视频检索的强度：
Recall@topk 是在前 k 项推荐中找到的相关项的比例。假设我们在 10 时计算召回率，发现它在我们的前 10 个推荐系统中为 40%。这意味着相关项目总数的 40% 显示在前 k 个结果中。

In [None]:
def mean_hit_ratio(actual, *predicteds):
    rets = []
    for predicted in predicteds:
        ratios = []
        for act, pre in zip(actual, predicted):
            hit_num = len(set(act) & set(pre))
            ratios.append(hit_num / len(act))
        rets.append(sum(ratios) / len(ratios))
    return rets

def get_label_from_raw_data(data):
    ret = []
    for item in data:
        ret.append(item[0])
    return ret


ev = (
    pipe.input('dc_data')
    .flat_map('dc_data', 'data', lambda x: x)
    .map('data', ('ground_truth', 'top1', 'top5', 'top10'), 
         lambda x: ([int(x.ground_truth)], 
                    get_label_from_raw_data(x.top1), 
                    get_label_from_raw_data(x.top5), 
                    get_label_from_raw_data(x.top10))
        )
    .window_all(('ground_truth', 'top1', 'top5', 'top10'), ('top1_mean_hit_ratio', 'top5_mean_hit_ratio', 'top10_mean_hit_ratio'), mean_hit_ratio)
    .output('top1_mean_hit_ratio', 'top5_mean_hit_ratio', 'top10_mean_hit_ratio')
)

DataCollection(ev(ret)).show()

此结果与论文中表示的召回率指标几乎相同。您可以在 paperwithcode 中找到有关指标的更多详细信息。

## Release a Showcase 发布 Showcase
我们已经学会了如何构建反向视频搜索引擎。现在是时候添加一些界面并发布一个展示了。Towhee 提供了 towhee.api（） 来将数据处理管道包装为带有 .as_function（） 的函数。因此，我们可以使用 Gradio 构建一个具有此milvus_search_function的快速演示。

In [None]:
import gradio

show_num = 3

milvus_search_pipe = (
    pipe.input('sentence')
    .map('sentence', 'vec', ops.video_text_embedding.clip4clip(model_name='clip_vit_b32', modality='text', device='cpu'))
    .map('vec', 'rows', 
         ops.ann_search.milvus_client(
             host='127.0.0.1', port='19530', collection_name='text_video_retrieval', limit=show_num)
    )
    .map('rows', 'videos_path',
         lambda rows: (os.path.join(raw_video_path, 'video' + str(r[0]) + '.mp4') for r in rows))
    .output('videos_path')
)


def milvus_search_function(text):
    return milvus_search_pipe(text).to_list()[0][0]

# print(milvus_search_function('a girl wearing red top and black trouser is putting a sweater on a dog'))


interface = gradio.Interface(milvus_search_function, 
                             inputs=[gradio.Textbox()],
                             outputs=[gradio.Video(format='mp4') for _ in range(show_num)]
                            )

interface.launch(inline=True, share=True)