# Setup

In [1]:
import json
from glob import glob
from tqdm import tqdm
from pydantic import Field
from typing import Optional
from docarray.typing import NdArray
from docarray import BaseDoc, DocList
from pymilvus import DataType, MilvusClient, utility, connections
from FlagEmbedding import BGEM3FlagModel

from pymongo import MongoClient
from bson import ObjectId
from datetime import datetime
import numpy as np

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
def convert_objectid(doc):
    if "_id" in doc:
        doc["_id"] = str(doc["_id"])
        
    if 'timestamp' in doc:
        doc['timestamp'] = int(doc['timestamp'].timestamp())

    return doc

In [3]:
mongo_client = MongoClient('localhost', port=21002)
db = mongo_client['stockelper']
news = db.news.find({})
news = list(news)
news = [convert_objectid(doc) for doc in news]

In [4]:
print(news[0])
print(len(news))

{'_id': '6721b36253e39d04879c6ee4', 'timestamp': 1730261112, 'query': 'BNK금융지주', 'title': 'BNK금융지주 3분기 영업이익 2천415억원…작년比 10.1%↓', 'press': '연합뉴스', 'summary': '시장 전망치 19.3% 하회', 'content': '(서울=연합뉴스) 코스피 상장사 BNK금융지주는 연결 기준 올해 3분기 영업이익이 2천415억원으로 지난해 동기보다 10.08% 감소한 것으로 잠정 집계됐다고 30일 공시했다.이번 영업이익은 연합인포맥스가 집계한 시장 전망치 2천991억원을 19.3% 하회했다.매출은 2조2천849억원으로 작년 동기 대비 2.69% 감소했다. 순이익은 2천184억원으로 2.86% 늘었다.stock_news@yna.co.kr', 'url': 'https://n.news.naver.com/mnews/article/001/0015015564?sid=101', 'origin': 'https://m.yna.co.kr/view/AKR20241030084400527?input=1195t'}
49929


In [5]:
class Document(BaseDoc):
    _id: str
    timestamp: int
    query: str
    title: str
    press: str
    summary: Optional[str] = None
    content: str
    url: str
    origin: str
    embedding: NdArray[1024] = Field(is_embedding=True)



# Create Index

In [6]:
milvus_client = MilvusClient(uri='http://localhost:21001')
connections.connect(host='localhost', port='21001')
# if utility.has_collection("stockelper"):
#     utility.drop_collection("stockelper")

In [21]:
# 스키마 정의
schema = MilvusClient.create_schema(
    auto_id=False,
    enable_dynamic_field=True,
)
# 필드 이름을 news 데이터에 맞게 수정
schema.add_field(field_name="_id", datatype=DataType.VARCHAR, is_primary=True, max_length=65535)
schema.add_field(field_name='timestamp', datatype=DataType.INT32)
schema.add_field(field_name='title', datatype=DataType.VARCHAR, max_length=65535)
schema.add_field(field_name='press', datatype=DataType.VARCHAR, max_length=65535)
schema.add_field(field_name='summary', datatype=DataType.VARCHAR, max_length=65535)
schema.add_field(field_name="content", datatype=DataType.VARCHAR, max_length=65535)
schema.add_field(field_name='url', datatype=DataType.VARCHAR, max_length=65535)
schema.add_field(field_name='origin', datatype=DataType.VARCHAR, max_length=65535)
schema.add_field(field_name='embedding', datatype=DataType.FLOAT_VECTOR, dim=1024)

{'auto_id': False, 'description': '', 'fields': [{'name': '_id', 'description': '', 'type': <DataType.VARCHAR: 21>, 'params': {'max_length': 65535}, 'is_primary': True, 'auto_id': False}, {'name': 'timestamp', 'description': '', 'type': <DataType.INT32: 4>}, {'name': 'title', 'description': '', 'type': <DataType.VARCHAR: 21>, 'params': {'max_length': 65535}}, {'name': 'press', 'description': '', 'type': <DataType.VARCHAR: 21>, 'params': {'max_length': 65535}}, {'name': 'summary', 'description': '', 'type': <DataType.VARCHAR: 21>, 'params': {'max_length': 65535}}, {'name': 'content', 'description': '', 'type': <DataType.VARCHAR: 21>, 'params': {'max_length': 65535}}, {'name': 'url', 'description': '', 'type': <DataType.VARCHAR: 21>, 'params': {'max_length': 65535}}, {'name': 'origin', 'description': '', 'type': <DataType.VARCHAR: 21>, 'params': {'max_length': 65535}}, {'name': 'embedding', 'description': '', 'type': <DataType.FLOAT_VECTOR: 101>, 'params': {'dim': 1024}}], 'enable_dynami

In [22]:
index_params = milvus_client.prepare_index_params()
index_params.add_index(field_name="id")
index_params.add_index(
    field_name="embedding", 
    index_type="AUTOINDEX",
    metric_type="IP"
)

In [9]:
milvus_client.create_collection(
    collection_name="stockelper",
    schema=schema,
    index_params=index_params
)

RPC error: [create_collection], <MilvusException: (code=65535, message=create duplicate collection with different parameters, collection: stockelper)>, <Time:{'RPC start': '2024-11-22 17:46:12.695891', 'RPC error': '2024-11-22 17:46:12.700964'}>
Failed to create collection: stockelper


MilvusException: <MilvusException: (code=65535, message=create duplicate collection with different parameters, collection: stockelper)>

# Upload Data

In [None]:
embedding_function = BGEM3FlagModel('BAAI/bge-m3', use_fp16=False, device='cuda')

In [29]:
ids[0]

'00004d1f0516a1af2533e4338b3e4327'

In [31]:
i

NameError: name 'i' is not defined

In [28]:
docs = []
collection = Collection("stockelper")
ids = [i['id'] for i in collection.query(expr="id != ''", output_fields=["_id"])]

a = 0
for data in tqdm(news):
    if data['_id'] in ids:
        a += 1
print(a)
#     tmp = f"title: {data['title']}\ncontent: {data['content']}"
#     embedding = embedding_function.encode(tmp, return_dense=True, return_sparse=False, batch_size=24)
#     data['embedding'] = np.array(embedding['dense_vecs'])
#     docs.append(Document(**data))

# docs = DocList[Document](docs)

100%|██████████| 49929/49929 [00:05<00:00, 8425.79it/s]

0





In [None]:
# 데이터 배치 크기 설정
batch_size = 1000  # 적절히 조절하세요
data_batch = []

# 이미 Milvus DB에 존재하는 ID를 확인하기 위한 집합
existing_ids = set()

# Milvus에서 기존 데이터의 ID를 가져옵니다.
# 기존 쿼리에서 'id is not null'을 'id != ""'로 변경
existing_entities = collection.query(expr="id != ''", output_fields=["id"])
existing_ids = {entity['id'] for entity in existing_entities}  # '_id'를 'id'로 변경

for idx, doc in enumerate(tqdm(docs)):
    data = doc.dict()
    # 'embedding' 필드를 리스트로 변환
    if isinstance(data['embedding'], np.ndarray):
        data['embedding'] = data['embedding'].tolist()
    
    # 'summary' 필드가 None이면 빈 문자열로 대체
    if data['summary'] is None:
        data['summary'] = ""
    
    # 중복된 데이터 확인 (Milvus DB에 이미 존재하는 ID)
    if data['id'] not in existing_ids:
        data_batch.append(data)
    
    # 배치 크기만큼 데이터가 모이면 Milvus에 삽입
    if (idx + 1) % batch_size == 0:
        try:
            res = milvus_client.insert(
                collection_name="stockelper",
                data=data_batch
            )
            data_batch = []  # 배치 초기화
        except Exception as e:
            print(f"데이터 삽입 중 오류 발생: {e}")

# 남은 데이터 삽입 (배치 크기와 관계없이)
if data_batch:  # 이 부분은 변경되지 않음
    try:
        res = milvus_client.insert(
            collection_name="stockelper",
            data=data_batch
        )
    except Exception as e:
        print(f"데이터 삽입 중 오류 발생: {e}")

# Test

In [12]:
from pymilvus import Collection

connections.connect(host='localhost', port='21001')

# 컬렉션 연결
collection = Collection("stockelper")

# 컬렉션 정보 출력
print(f"컬렉션 이름: {collection.name}")
print(f"필드 정보: {collection.schema.fields}")
print(f"엔티티 수: {collection.num_entities}")  # 총 데이터의 수 출력
# 추가된 코드
total_entities = collection.num_entities  # 총 데이터 수를 변수에 저장
print(f"총 데이터 수: {total_entities}")  # 총 데이터 수 출력

컬렉션 이름: stockelper
필드 정보: [{'name': 'id', 'description': '', 'type': <DataType.VARCHAR: 21>, 'params': {'max_length': 65535}, 'is_primary': True, 'auto_id': False}, {'name': 'timestamp', 'description': '', 'type': <DataType.INT32: 4>}, {'name': 'title', 'description': '', 'type': <DataType.VARCHAR: 21>, 'params': {'max_length': 65535}}, {'name': 'press', 'description': '', 'type': <DataType.VARCHAR: 21>, 'params': {'max_length': 65535}}, {'name': 'summary', 'description': '', 'type': <DataType.VARCHAR: 21>, 'params': {'max_length': 65535}}, {'name': 'content', 'description': '', 'type': <DataType.VARCHAR: 21>, 'params': {'max_length': 65535}}, {'name': 'url', 'description': '', 'type': <DataType.VARCHAR: 21>, 'params': {'max_length': 65535}}, {'name': 'origin', 'description': '', 'type': <DataType.VARCHAR: 21>, 'params': {'max_length': 65535}}, {'name': 'embedding', 'description': '', 'type': <DataType.FLOAT_VECTOR: 101>, 'params': {'dim': 1024}}]
엔티티 수: 30065
총 데이터 수: 30065


In [None]:
embedding_function = BGEM3FlagModel('BAAI/bge-m3', use_fp16=False, device='cpu')

In [None]:
import numpy as np

query = '삼성전자'
query_vector = embedding_function.encode(query)['dense_vecs'].tolist()

# 검색 파라미터 설정
search_params = {
    "metric_type": "IP",  # 또는 컬렉션 생성 시 사용한 metric_type
    "params": {"nprobe": 10}
}

# 검색 수행
results = collection.search(
    data=[query_vector],  # 검색할 벡터 리스트
    anns_field="embedding",  # 검색할 필드 이름
    param=search_params,
    limit=5,  # 반환할 결과 수
    output_fields=["title", "summary", "content", "timestamp"]  # 반환할 필드 목록
)

# 검색 결과 출력
for hits in results:
    for hit in hits:
        print(f"ID: {hit.id}, 거리: {hit.distance}")
        print(f"Title: {hit.entity.get('title')}")
        print(f"Summary: {hit.entity.get('summary')}")
        print(f"Content: {hit.entity.get('content')}")
        timestamp = hit.entity.get('timestamp')
        formatted_timestamp = datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S")
        print(f"Timestamp: {formatted_timestamp}")


In [None]:
import json
from tqdm import tqdm
from pymongo import MongoClient
from pymilvus import connections, DataType, MilvusClient, utility
from FlagEmbedding import BGEM3FlagModel
from datetime import datetime
import numpy as np

# MongoDB 클라이언트 연결
mongo_client = MongoClient('localhost', port=21002)
db = mongo_client['stockelper']
news_collection = db.news

# MilvusDB 클라이언트 연결
milvus_client = MilvusClient(uri='http://localhost:21001')
connections.connect(host='localhost', port='21001')

# Milvus 컬렉션 확인 및 생성
collection_name = "stockelper"
if not utility.has_collection(collection_name):
    # 스키마 정의
    schema = MilvusClient.create_schema(
        auto_id=False,
        enable_dynamic_field=True,
    )
    schema.add_field(field_name="id", datatype=DataType.VARCHAR, is_primary=True, max_length=65535)
    schema.add_field(field_name='timestamp', datatype=DataType.INT32)
    schema.add_field(field_name='title', datatype=DataType.VARCHAR, max_length=65535)
    schema.add_field(field_name='press', datatype=DataType.VARCHAR, max_length=65535)
    schema.add_field(field_name='summary', datatype=DataType.VARCHAR, max_length=65535)
    schema.add_field(field_name="content", datatype=DataType.VARCHAR, max_length=65535)
    schema.add_field(field_name='url', datatype=DataType.VARCHAR, max_length=65535)
    schema.add_field(field_name='origin', datatype=DataType.VARCHAR, max_length=65535)
    schema.add_field(field_name='embedding', datatype=DataType.FLOAT_VECTOR, dim=1024)
    
    index_params = milvus_client.prepare_index_params()
    index_params.add_index(field_name="id")
    index_params.add_index(
        field_name="embedding", 
        index_type="AUTOINDEX",
        metric_type="IP"
    )
    
    # 컬렉션 생성
    milvus_client.create_collection(
        collection_name=collection_name,
        schema=schema,
        index_params=index_params
    )

# 임베딩 함수 초기화
embedding_function = BGEM3FlagModel('BAAI/bge-m3', use_fp16=False, device='cuda')

# MilvusDB에서 데이터 조회
try:
    milvus_data = milvus_client.query(
        collection_name=collection_name,
        filter="id != ''",  # 모든 데이터를 가져오기 위해 유효한 조건 사용
        output_fields=["id"],
        limit=10000  # 조회할 데이터 수 제한
    )
    existing_ids = {data['id'] for data in milvus_data}
except Exception as e:
    print(f"Failed to query collection: {collection_name}, error: {e}")
    existing_ids = set()  # 데이터가 없을 경우 빈 집합으로 초기화

# MongoDB에서 데이터 가져오기
news_data = news_collection.find({})

print(len(existing_ids))
print(existing_ids[0])

# 데이터 대조 및 처리
new_docs = []
for data in tqdm(news_data):
    data["_id"] = str(data["_id"])  # ObjectId를 문자열로 변환
    if data["_id"] in existing_ids:
        continue  # 이미 존재하면 스킵

    # 데이터 준비 및 임베딩 생성
    tmp = f"title: {data['title']}\ncontent: {data['content']}"
    embedding = embedding_function.encode(tmp, return_dense=True, return_sparse=False, batch_size=24)
    data['embedding'] = np.array(embedding['dense_vecs'])

    # MilvusDB에 저장할 문서 생성
    milvus_doc = {
        "id": data["_id"],
        "timestamp": int(data['timestamp'].timestamp()) if 'timestamp' in data else None,
        "title": data['title'],
        "press": data['press'],
        "summary": data['summary'],
        "content": data['content'],
        "url": data['url'],
        "origin": data['origin'],
        "embedding": data['embedding'].tolist()
    }
    new_docs.append(milvus_doc)


In [None]:
# 데이터 검증 함수
def validate_and_prepare_data(data):
    """Validate and prepare data to match Milvus schema."""
    validated_data = {
        "id": data.get("_id", ""),
        "timestamp": data['timestamp'],
        "title": data.get("title", ""),
        "press": data.get("press", ""),
        "summary": data.get("summary", ""),  # summary 필드가 None이면 빈 문자열로 설정
        "content": data.get("content", ""),
        "url": data.get("url", ""),
        "origin": data.get("origin", ""),
        "embedding": data["embedding"].tolist() if "embedding" in data else []
    }
    return validated_data

# MilvusDB에 삽입할 데이터 준비
milvus_docs = []
for data in tqdm(new_docs, desc="Validating and preparing data"):
    tmp = f"title: {data['title']}\ncontent: {data['content']}"
    embedding = embedding_function.encode(tmp, return_dense=True, return_sparse=False, batch_size=24)
    data['embedding'] = np.array(embedding['dense_vecs'])
    
    # 데이터 검증 및 준비
    validated_data = validate_and_prepare_data(data)
    milvus_docs.append(validated_data)

# Milvus에 데이터 삽입
if milvus_docs:
    milvus_client.insert(collection_name=collection_name, data=milvus_docs)
    print(f"{len(milvus_docs)} documents inserted into Milvus.")
else:
    print("No new documents to insert.")
