# Milvus를 활용한 임베딩 데이터 추출 및 저장

In [1]:
import os
from dotenv import load_dotenv
load_dotenv()

True

# VectorDB 와 Milvus
Milvus work as vector store to enable quick document queries. We compute document embeddings using a small BERT model for semantic search.
Milvus 는 VectorDB로 주어진 문서를 자연어 임베딩을 기반으로 쿼리를 실행할 수 있도록 만들어 졌습니다. 이번 시간에는 자연어 모델인 BERT 모델의 소형 버전을 활용하여 시멘틱 서치를 하기 위한 준비작업을 해보겠습니다.


### 데이터셋 다운로드
"wikihow.csv" 데이터셋을 다운로드 받아 지정된 곳에 저장하세요. 다운로드 링크: `https://ibm.box.com/s/8nvanf974t35d89cmibk75e3gc6d1pbo` [TOBECHANGED]

In [2]:

WH_PATH = "wikihow.csv"
WH_PATH

'wikihow.csv'

### 데이터 로딩 및 데이터 전처리
현재 사용할 데이터에는 다음과 같은 문제가 있습니다:
- titles 데이터가 sectionLabel 데이터로 등록된 경우
- 아포스트로피와 같은 문장 부호의 잘못된 사용 사례
- 가짜 번호로 끝나는 일부 제목

In [3]:
import pandas
doc = pandas.read_csv(WH_PATH)

In [4]:
doc_indexed = doc.set_index(['title', 'headline']).sort_index()

In [5]:
doc_indexed.tail()

Unnamed: 0_level_0,Unnamed: 1_level_0,overview,text,sectionLabel
title,headline,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
How to Zydeco,\nTry a side step.,Zydeco dancing is type of Cajun dancing perfo...,If you already have the rock step down (or ev...,Adding Movement
How to Zydeco,\nTry the open position.,Zydeco dancing is type of Cajun dancing perfo...,"The open position is, as it sounds, much more...",Learning the Closed and Open Position
How to Zydeco,\nUse a rock step.,Zydeco dancing is type of Cajun dancing perfo...,"Often, you'll just be shifting your weight ba...",Adding Movement
How to Zydeco,\nUse dance techniques for the extra beat.,Zydeco dancing is type of Cajun dancing perfo...,It can be hard to remember to hold for the ex...,Learning the Beat
,\nInsert the following into your <head> section:\n\n\n\n\n\n,Do you want to change the user's cursor when ...,"Steps,Tips,Related wikiHows",How to Set Cursors for Webpage Links


### Milvus DB connection 
Python에서 MilvusDB 연결하기 위해서는 공식 라이브러리인 pymilvus를 사용할 예정입니다. 혹은 LangChain의 Milvus 벡터스토어 클래스를 사용하여 문서 인스턴스를 추가할 수도 있습니다. 

In [6]:
# Milvus requires a connection for all operations. Remember to disconnect at the end.

from pymilvus import connections
connections.connect(
  alias="default",
  host="127.0.0.1", # YOUR IP
  port="19530"      # YOUR PORT
)

### Milvus store에 DB schema 선언
만약 collection 이름이 같지만 다른 schema가 선언된 경우 SchemaNotReady exception이 발생할 수 있으니, collection과 schema는 pair로 관리되어야 합니다.
또한 텍스트 필드의 최대 길이는 문자가 아닌 바이트 단위로 계산됩니다. 문자열의 바이트 크기를 가져와 스키마의 바이트 제한에 맞게 잘라내어 DB성능의 향상을 꾀할 수도 있지만, DB내 생성될 record들의 데이터 길이에 대한 사전 정보가 없기 때문에 허용되는 최대값(65535)으로 제한을 설정하는 것이 좋습니다.


특정 필드는 사전에 정의된 namespace를 지켜주어야 합니다.
- primary key는 반드시 "pk"로 선언되어야 합니다.
- 계산된 vector는 "vector"로 선언되어야 합니다.
- text entry 는 반드시 "text"로 선언되어야 합니다.

In [7]:
from pymilvus import CollectionSchema, FieldSchema, DataType, Collection, utility

In [8]:
# Milvus also supports schemaless operations if `enable_dynamic_fields=True`.

MAX_TITLE = 512
MAX_TEXT = 1024
MAX_VEC = 384

NAME = "WikiHow"

if NAME in utility.list_collections():
    whcollection = Collection(NAME)
    whcollection.drop()

whschema = CollectionSchema(
    fields=[
        FieldSchema(name="pk", dtype=DataType.INT64, is_primary=True, auto_id=True),
        FieldSchema(name="title", dtype=DataType.VARCHAR, max_length=65535, default_value=""),
        FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=65535, default_value=""),
        FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=384)
    ],
    enable_dynamic_fields=False,
    description="WikiHow collection"
)
whcollection = Collection(
    name=NAME,
    schema=whschema,
    consistency_level="Session" # Make sure we read our own writes, otherwise allowed to be a bit out of date.
)

### MilvusDB 사용시 Batch 기반 입력
작은 BERT 모델을 사용하여 Milvus store내 배치할 문서에 대한 임베딩을 계산할 예정입니다.

이 예제에서 배치 크기 선택은 임의로 계산되었습니다. 사용자의 노트북 성능에 따라 최적의 Batch 연산 파라미터와 사용 가능한 모델이 다를 수 있습니다.

When the embedding model runs on GPU, the batch size should be selected so as to optimize the transfer-to-memory vs runtime overheads (too small and a major amount of time will be wasted on memory transfers instead of embedding proper, too large and it won't fit on the device).
If the model is accessed over the network, the batch size should be selected with the same concerns in mind, although further overhead may be incurred depending on how the model is scheduled or how the API is designed.


임베딩 모델이 GPU에서 실행되는 경우, 배치 크기는 메모리로의 전송과 런타임 오버헤드를 최적화할 수 있도록 선택해야 합니다.(배치 크기가 너무 작으면 제대로 임베딩되지 않고 CPU,GPU간 메모리 전송에 많은 시간이 낭비되고, 너무 크면 실행될 수 없습니다).
네트워크를 통해 모델에 액세스하는 경우, 모델 스케줄링 방식이나 API 설계 방식에 따라 추가 오버헤드가 발생할 수 있습니다.

With regard to milvus, the idea is the same: a batch size that's too small means incurring milvus' operational overhead along with communication overhead. The other tradeoff of note regards any temporary processing or data streaming that may occur: a higher batch size also implies loading more data into memory and possibly generating longer-lasting temporary artifacts before submitting the data to milvus, after which it can all be discarded.
MilvusDB에서도 record 입력 배치 크기가 너무 작으면 통신 오버헤드와 함께 MilvusDB의 운영 오버헤드가 발생할 수 있습니다. 배치 크기가 클수록 더 많은 데이터를 메모리에 로드하고 데이터를 밀버스에 저장하기 전에 더 오래 지속되는 임시 변수를 관리할 수 있어 record 입력이 용이합니다.


### 임베딩 모델 로딩
HuggingFaceEmbeddings의 MiniLM BERT model을 사용하여 임베딩을 게산합니다.

In [9]:
import langchain
from langchain.embeddings import HuggingFaceEmbeddings

In [10]:
embeddings = HuggingFaceEmbeddings(model_name='all-MiniLM-L6-v2')

In [11]:
BATCH_SIZE = 2048

batch = []
def insert_data(data):
    import math

    batch = []

    titles = list(data.keys())

    vecs = embeddings.embed_documents(titles)
    
    entries = [[], [], []]

    for b, title in enumerate(titles):
        text = title + ":\n"
        for cat in data[title]:
            text += cat + ":\n"
            text += "\n".join(data[title][cat])
            
        title_len_diff = len(title.encode('utf-16-le')) - len(title)
        text_len_diff = len(text.encode('utf-16-le')) - len(text)
        entries[0].append(title[:MAX_TITLE - title_len_diff])
        entries[1].append(text[:MAX_TEXT - text_len_diff])
        entries[2].append(vecs[b])

    whcollection.insert(entries)

import collections, tqdm
doc_data = collections.defaultdict(lambda: collections.defaultdict(list))
for i in tqdm.tqdm(range(len(doc_indexed)), total=len(doc_indexed)):
    if (type(doc_indexed.index[i][0]) is not str) or (type(doc_indexed.index[i][1]) is not str):
        continue
    die = False
    for col in ['text', 'overview', 'sectionLabel']:
        if type(doc_indexed.iloc[i][col]) is not str:
            die = True
            break
    if die:
        continue
    section_head = doc_indexed.index[i][0] + " (" + doc_indexed.iloc[i]['overview'].strip() + ")"
    category = doc_indexed.index[i][1]
    step = " ".join(map(lambda x: x.strip(), doc_indexed.iloc[i][['sectionLabel', 'text']]))

    if len(doc_data) % BATCH_SIZE == 1 and len(doc_data) != 1:
        insert_data(doc_data)
        doc_data = collections.defaultdict(lambda: collections.defaultdict(list))
    doc_data[section_head][category].append(step)
    if i == len(doc_indexed) - 1:
        insert_data(doc_data)

100%|█████████████████████████████████████████████████████████████████████| 1585695/1585695 [2:31:20<00:00, 174.64it/s]


In [12]:
# Milvus will not seal segments that are too small, a flush is necessary to force it.
whcollection.flush()

## Vector 인덱싱후 쿼리
벡터를 인덱싱하면 검색 속도를 크게 높일 수 있습니다. 여기서는 L2 norm과 flat indexing을 사용합니다. (파라미터: `IVF_FLAT`).

collection을 로딩하게 되면 Milvus는 메모리에 해당 collection을 로드하며 다양한 connection에서 해당 collection에 대한 쿼리를 실행할 준비를 하게 됩니다.
만약 langchain milvus store interface를 사용할 거라면 이후의 코드를 실행하지 마세요.



In [13]:
whcollection.create_index(field_name="vector", index_params={"metric_type": "L2", "index_type": "IVF_FLAT", "nlist": "1024"})

Status(code=0, message=)

In [14]:
whcollection.load()
# To actually use the data, we would have to do a `whcollection.load()` before any queries.
# Once done with queries, we should then use `whcollection.release()` to stop using resources

## Disconnect
DB에 접속 후 사용한 connection은 반드시 중지 시키는게 좋습니다.

In [15]:
whcollection.release()
connections.disconnect("default")