# 고급 RAG 실습 (v2): .env + Pinecone + Ollama
- 날짜: 2025-09-29
- 이 노트북은 **.env**에서 환경변수를 불러와 **Pinecone**과 **Ollama**를 함께 사용하는 하이브리드 RAG 파이프라인을 제공합니다.
- 구성: **MIRACL-ko 공개 데이터** → Dense(BGE-M3) + Sparse(BM25) → RRF/Weighted → **BGE Reranker** → **Ollama 생성**.

## 0) .env 템플릿 (로컬에 파일 생성)
아래 내용을 프로젝트 루트의 `.env` 파일에 저장하세요.

```
PINECONE_API_KEY=pc_********************************
PC_CLOUD=aws
PC_REGION=us-east-1
PC_INDEX_NAME=hybrid-miracl-ko

# Ollama 기본 호스트 (로컬 실행 시)
OLLAMA_HOST=http://localhost:11434
# 선호 모델 예: llama3.1:8b-instruct, qwen2.5:7b-instruct 등
OLLAMA_MODEL=llama3.1:8b-instruct
```

## 1) 설치

In [1]:
# !pip -q install -U pip
!pip -q install -U python-dotenv requests pinecone pinecone-text FlagEmbedding datasets rank_bm25 matplotlib tqdm
# 선택: torch 최신 (환경에 따라 생략 가능)
# !pip -q install -U torch --index-url https://download.pytorch.org/whl/cpu

In [3]:
!pip uninstall -y pinecone pinecone-client pinecone-plugin-inference
!pip install -U pinecone


Found existing installation: pinecone 7.3.0
Uninstalling pinecone-7.3.0:
  Successfully uninstalled pinecone-7.3.0
[0mCollecting pinecone
  Using cached pinecone-7.3.0-py3-none-any.whl.metadata (9.5 kB)
Using cached pinecone-7.3.0-py3-none-any.whl (587 kB)
Installing collected packages: pinecone
Successfully installed pinecone-7.3.0


## 2) 환경 변수 로드 (.env) & 구성
- `.env`를 읽어 **PINECONE_API_KEY**, **OLLAMA_HOST**, 기타 파라미터를 설정합니다.
- α 프리셋 / N·k 템플릿도 환경 변수 또는 기본값으로 지정합니다.

In [3]:
import os
from dotenv import load_dotenv

load_dotenv()

# Pinecone
PINECONE_API_KEY = os.getenv("PINECONE_API_KEY", "")
PC_CLOUD  = os.getenv("PC_CLOUD", "aws")
PC_REGION = os.getenv("PC_REGION", "us-east-1")
INDEX_NAME = os.getenv("PC_INDEX_NAME", "hybrid-miracl-ko")
METRIC = os.getenv("PC_METRIC", "dotproduct")

# Ollama
OLLAMA_HOST  = os.getenv("OLLAMA_HOST", "http://localhost:11434")
OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "llama3.1:8b-instruct")

# 실습 스케일
CORPUS_MAX = int(os.getenv("CORPUS_MAX", "8000"))
N_QUERIES  = int(os.getenv("N_QUERIES", "30"))

# α 프리셋
ALPHA_PRESETS = {"balanced":0.5, "semantic_heavy":0.7, "keyword_heavy":0.3}
ALPHA = float(os.getenv("ALPHA", ALPHA_PRESETS["balanced"]))

# N/k 템플릿
TEMPLATES = {
    "speed":   {"N": 50,  "k": 3},
    "balanced":{"N": 100, "k": 5},
    "quality": {"N": 200, "k": 10}
}
N = int(os.getenv("RETRIEVER_N", TEMPLATES["balanced"]["N"]))
K = int(os.getenv("LLM_K",       TEMPLATES["balanced"]["k"]))

print("Pinecone:", INDEX_NAME, PC_CLOUD, PC_REGION, METRIC)
print("Ollama:", OLLAMA_HOST, OLLAMA_MODEL)
print("Scale:", CORPUS_MAX, N_QUERIES, " | α=", ALPHA, " N=", N, " K=", K)
assert PINECONE_API_KEY, "PINECONE_API_KEY is required (set in .env)"

Pinecone: hybrid-miracl-ko aws us-east-1 dotproduct
Ollama: http://localhost:11434 llama3.1:8b-instruct
Scale: 8000 30  | α= 0.5  N= 100  K= 5


## 3) Ollama 채팅 함수 (요청하신 패턴 반영)
- 아래 함수는 `.env`의 `OLLAMA_HOST`를 기본으로 사용합니다.

In [4]:
import json, time, requests
from typing import List, Dict

def chat_ollama(model: str, messages: List[Dict], stream: bool = False, host: str = None):
    host = host or os.getenv("OLLAMA_HOST", "http://localhost:11434")
    url = f"{host}/api/chat"
    headers = {"Content-Type":"application/json"}
    payload = {"model": model, "messages": messages, "stream": stream}
    t0 = time.time()
    resp = requests.post(url, headers=headers, data=json.dumps(payload), timeout=120)
    dt = time.time() - t0
    resp.raise_for_status()
    data = resp.json()
    text = data.get("message", {}).get("content", "")
    return text, dt

## 4) 공개 데이터 로드 (MIRACL-ko)
- 쿼리/정답(dev), 코퍼스(train)을 로드합니다.
- 코퍼스는 `CORPUS_MAX` 만큼만 샘플링합니다.

In [5]:
from datasets import load_dataset
import pandas as pd

queries_ds = load_dataset("Cohere/miracl-ko-queries-22-12", split="dev")
# MIRACL 한국어 코퍼스 샤드 (docs-0.jsonl.gz 등)
url = "https://huggingface.co/datasets/miracl/miracl-corpus/resolve/main/miracl-corpus-v1.0-ko/docs-0.jsonl.gz"

corpus_ds = load_dataset("json", data_files=url, split="train")

print(corpus_ds)
print(corpus_ds[0])

corpus_ds = load_dataset(
    "json",
    data_files="https://huggingface.co/datasets/miracl/miracl-corpus/resolve/main/miracl-corpus-v1.0-ko/docs-*.jsonl.gz",
    split="train"
)


if CORPUS_MAX and len(corpus_ds) > CORPUS_MAX:
    corpus_ds = corpus_ds.select(range(CORPUS_MAX))

corpus_df = corpus_ds.to_pandas().rename(columns={"docid":"doc_id"})
corpus_df["doc_id"] = corpus_df["doc_id"].astype(str)

# 검색 후 문서 텍스트를 회수하기 위해 lookup dict 준비
ID2TEXT  = dict(zip(corpus_df["doc_id"], corpus_df["text"].fillna("").astype(str)))
ID2TITLE = dict(zip(corpus_df["doc_id"], corpus_df["title"].fillna("").astype(str)))

corpus_df.head(2)

  from .autonotebook import tqdm as notebook_tqdm


Dataset({
    features: ['docid', 'title', 'text'],
    num_rows: 500000
})
{'docid': '5#0', 'title': '지미 카터', 'text': '제임스 얼 "지미" 카터 주니어(, 1924년 10월 1일 ~ )는 민주당 출신 미국 39번째 대통령 (1977년 ~ 1981년)이다.'}


Unnamed: 0,doc_id,title,text
0,5#0,지미 카터,"제임스 얼 ""지미"" 카터 주니어(, 1924년 10월 1일 ~ )는 민주당 출신 미..."
1,5#1,지미 카터,지미 카터는 조지아주 섬터 카운티 플레인스 마을에서 태어났다. 조지아 공과대학교를 ...


In [6]:
len(corpus_df)

8000

## 5) Sparse (BM25) 인코더 적합

In [7]:
from pinecone_text.sparse import BM25Encoder
from tqdm import trange
import numpy as np, pickle, os

texts = (corpus_df["title"].fillna("") + " " + corpus_df["text"].fillna("")).tolist()
bm25 = BM25Encoder()
bm25.fit(texts)
sparse_vectors = bm25.encode_documents(texts)

os.makedirs("artifacts", exist_ok=True)
with open("artifacts/bm25_encoder.pkl","wb") as f:
    pickle.dump(bm25, f)

len(sparse_vectors), len(texts)

100%|██████████| 8000/8000 [00:03<00:00, 2131.37it/s]


(8000, 8000)

## 6) Dense 임베딩: BGE-M3

In [8]:
import math, numpy as np

CANDIDATE_TEXT_KEYS = ("text", "content", "body", "doc", "passage", "sentence")

def to_text(x):
    """입력 x를 안전한 str로 변환(실패 시 None)."""
    if x is None:
        return None
    # NaN 처리 (float/np.float 계열)
    if isinstance(x, (float, np.floating)) and math.isnan(x):
        return None
    # bytes -> str
    if isinstance(x, (bytes, bytearray)):
        try:
            return x.decode("utf-8", errors="ignore")
        except Exception:
            return None
    # dict에서 텍스트 필드 추출
    if isinstance(x, dict):
        for k in CANDIDATE_TEXT_KEYS:
            if k in x:
                return to_text(x[k])
        return None
    # (토큰 리스트가 아니라) 문자열 조합 필요한 경우
    if isinstance(x, (list, tuple)):
        # 이미 토큰화된 list[str]은 허용되지만 여기선 concat해 하나의 문장으로 만듦
        if all(isinstance(t, str) for t in x):
            return " ".join(x)
        # 그렇지 않으면 문자열화 시도
        try:
            return " ".join(map(str, x))
        except Exception:
            return None
    # 넘파이 스칼라/정수 등 → 문자열
    if isinstance(x, (int, np.integer)) or isinstance(x, (np.floating,)):
        return str(x)
    # 그 외 객체 → 문자열화 시도
    if not isinstance(x, str):
        try:
            x = str(x)
        except Exception:
            return None
    return x


In [9]:
raw_texts = texts  # 기존 리스트/시리즈/제너레이터 등

cleaned_texts = []
bad_indices = []
for i, x in enumerate(raw_texts):
    s = to_text(x)
    if s is None:
        bad_indices.append(i)
        continue
    s = s.strip()
    if not s:
        bad_indices.append(i)
        continue
    cleaned_texts.append(s)

if bad_indices:
    print(f"[warn] {len(bad_indices)}개의 레코드가 문자열이 아니거나 비어 있어 제외됨. 예시 인덱스: {bad_indices[:10]}")


### 아래 임베딩 모델 로드 및 임베딩 문서 저장 시 메모리 부족이 뜰 수 있어서, Batch 32개씩 진행 (53분 소요), 따라서 더 빠르게 고사양으로 진행하고자 한다면, Batch를 64, 128로 진행해서 시간을 최적화할 것!


In [10]:
import os, torch, numpy as np
from tqdm import trange
from FlagEmbedding import BGEM3FlagModel

# 🤫 토크나이저 멀티프로세스 경고 억제용 환경변수.
# 병렬 토크나이징으로 인한 경고/출력 섞임을 피하고 싶을 때 False로 둡니다.
os.environ["TOKENIZERS_PARALLELISM"] = "false"

# ⚙️ 사용 가능한 CUDA(GPU)가 있는지 확인
use_cuda = torch.cuda.is_available()

# 장치 선택: GPU가 있으면 "cuda:0", 없으면 "cpu"
device = "cuda:0" if use_cuda else "cpu"

# FP16(half precision) 사용 여부: 보통 GPU가 있을 때만 켭니다.
# - 장점: 메모리 절약, 속도 향상 가능
# - 단점: 아주 드문 경우 수치 정밀도 이슈
use_fp16 = use_cuda

# 🧠 BGE-M3 임베딩 모델 로드
# - "BAAI/bge-m3" 허깅페이스 허브에서 받아옵니다(최초 1회 캐시 후 재사용).
# - use_fp16: 위에서 결정한 half precision 사용
# - devices: 어떤 디바이스에 올릴지 지정 (예: "cuda:0" 또는 "cpu")
bge = BGEM3FlagModel("BAAI/bge-m3", use_fp16=use_fp16, devices=device)

# ✂️ 최대 토큰 길이: 512~1024 권장(길수록 더 긴 문맥을 커버하지만 속도/VRAM 증가)
MAX_LEN = 1024

# 📦 배치 크기: 16~32부터 시작해서 VRAM/메모리에 맞춰 조정
BATCH   = 32

# 모든 문서의 dense 임베딩을 담아둘 리스트 (배치별로 쌓았다가 마지막에 합칩니다)
dense_vecs = []

# 🔒 추론 모드: 자동 미분 OFF → 메모리 절약 & 속도 향상
with torch.inference_mode():
    # trange: 진행상황(progress bar)을 보여주는 range
    for i in trange(0, len(cleaned_texts), BATCH):
        # 현재 배치 슬라이싱 (✅ 반드시 list[str] 형태여야 합니다)
        batch = cleaned_texts[i:i+BATCH]

        # ✅ 임베딩 추출
        # - batch_size: 내부 처리 배치 크기 (보통 BATCH와 동일하게 둠)
        # - max_length: 토큰 최대 길이(초과분은 모델 토크나이저에서 잘립니다)
        # - return_dense: dense 임베딩(ANN, 벡터DB용) 반환
        # - return_sparse: BM25 유사한 sparse(토큰 기반) 벡터 반환 여부 (여기선 미사용)
        # - return_colbert_vecs: ColBERT 스타일 토큰 단위 벡터 반환 여부 (여기선 미사용)
        out = bge.encode(
            batch,
            batch_size=BATCH,
            max_length=MAX_LEN,
            return_dense=True,
            return_sparse=False,
            return_colbert_vecs=False,
        )

        # 모델 출력에서 dense 임베딩(Numpy ndarray)을 꺼냅니다. shape: (batch, dim)
        dense = out["dense_vecs"]

        # 🔄 L2 정규화: 각 벡터를 단위벡터로(normalize) 만들어 코사인 유사도 계산을 안정화
        # - np.linalg.norm(..., axis=1, keepdims=True): 각 행(문서)별 L2 노름
        # - 분모가 0인(edge) 경우가 드물지만 있을 수 있으니, 필요시 eps를 더해 방어코드 추가 가능
        dense = dense / np.linalg.norm(dense, axis=1, keepdims=True)

        # 배치 결과를 리스트에 쌓아둠
        dense_vecs.append(dense)

# 🔧 여러 배치로 쌓인 배열들을 한 번에 세로로 이어붙이기
# - 결과 shape: (num_texts, dim)
dense_vecs = np.vstack(dense_vecs).astype("float32")  # 벡터DB 호환/메모리 절약 위해 float32 캐스팅

# 벡터 차원(dimension) 확인
dense_dim = dense_vecs.shape[1]

# 전체 개수와 차원 출력 (예: (N, 1024) 1024)
print(dense_vecs.shape, dense_dim)


Fetching 30 files: 100%|██████████| 30/30 [00:00<00:00, 234318.66it/s]
pre tokenize: 100%|██████████| 1/1 [00:00<00:00, 83.48it/s]
You're using a XLMRobertaTokenizerFast tokenizer. Please note that with a fast tokenizer, using the `__call__` method is faster than using a method to encode the text followed by a call to the `pad` method to get a padded encoding.
Inference Embeddings: 100%|██████████| 1/1 [00:06<00:00,  6.64s/it]
pre tokenize: 100%|██████████| 1/1 [00:00<00:00, 101.29it/s]
Inference Embeddings: 100%|██████████| 1/1 [00:11<00:00, 11.56s/it]
pre tokenize: 100%|██████████| 1/1 [00:00<00:00, 84.92it/s]
Inference Embeddings: 100%|██████████| 1/1 [00:04<00:00,  4.84s/it]
pre tokenize: 100%|██████████| 1/1 [00:00<00:00, 134.67it/s]
Inference Embeddings: 100%|██████████| 1/1 [00:04<00:00,  4.48s/it]
pre tokenize: 100%|██████████| 1/1 [00:00<00:00, 120.05it/s]
Inference Embeddings: 100%|██████████| 1/1 [00:03<00:00,  3.52s/it]
pre tokenize: 100%|██████████| 1/1 [00:00<00:00, 106.5

(8000, 1024) 1024





## 7) Pinecone 서버리스 인덱스 생성 & 업서트

In [12]:
from pinecone import Pinecone, ServerlessSpec

pc = Pinecone(api_key=PINECONE_API_KEY)

if INDEX_NAME not in pc.list_indexes().names():
    pc.create_index(
        name=INDEX_NAME,
        dimension=int(dense_dim),
        metric=METRIC,
        spec=ServerlessSpec(cloud=PC_CLOUD, region=PC_REGION)
    )

index = pc.Index(INDEX_NAME)
print(index.describe_index_stats())


batch = []
for i in trange(len(corpus_df)):
    _id = corpus_df.iloc[i]["doc_id"]
    meta = {"title": ID2TITLE[_id], "lang":"ko", "source":"miracl-ko-wiki"}
    batch.append({
        "id": _id,
        "values": dense_vecs[i].tolist(),
        "sparse_values": sparse_vectors[i],
        "metadata": meta
    })
    if len(batch) >= 200:
        index.upsert(vectors=batch)
        batch = []
if batch:
    index.upsert(vectors=batch)

print("Upsert complete.")
print(index.describe_index_stats())

{'dimension': 1024,
 'index_fullness': 0.0,
 'metric': 'dotproduct',
 'namespaces': {'': {'vector_count': 3600}},
 'total_vector_count': 3600,
 'vector_type': 'dense'}


100%|██████████| 8000/8000 [09:27<00:00, 14.11it/s]


Upsert complete.
{'dimension': 1024,
 'index_fullness': 0.0,
 'metric': 'dotproduct',
 'namespaces': {'': {'vector_count': 8000}},
 'total_vector_count': 8000,
 'vector_type': 'dense'}


## 8) 검색 함수: Dense / Sparse / Weighted(α) / RRF

In [13]:
from typing import List, Dict
from collections import defaultdict

def encode_query_dense(q: str):
    return bge.encode([q], max_length=8192)["dense_vecs"][0].astype("float32")

def encode_query_sparse(q: str):
    return bm25.encode_queries([q])[0]

def dense_only_search(query: str, top_k=20):
    dv = encode_query_dense(query).tolist()
    res = index.query(vector=dv, top_k=top_k, include_metadata=True)
    return res.matches

def sparse_only_search(query: str, top_k=20):
    sv = encode_query_sparse(query)
    res = index.query(sparse_vector=sv, top_k=top_k, include_metadata=True)
    return res.matches

def hybrid_weighted_search(query: str, top_k=20, alpha=ALPHA):
    dv = encode_query_dense(query)
    sv = encode_query_sparse(query)
    dv = (dv * alpha).tolist()
    sv_scaled = {"indices": sv["indices"], "values": [v*(1.0-alpha) for v in sv["values"]]}
    res = index.query(vector=dv, sparse_vector=sv_scaled, top_k=top_k, include_metadata=True)
    return res.matches

def rrf_fusion(query: str, top_k=20, per_list_k=50, k_const=60):
    dres = dense_only_search(query, top_k=per_list_k)
    sres = sparse_only_search(query, top_k=per_list_k)
    meta = {}
    ranks = defaultdict(list)
    for rlist in [dres, sres]:
        for rank, m in enumerate(rlist, start=1):
            meta[m.id] = m.metadata
            ranks[m.id].append(rank)
    scores = []
    for _id, rks in ranks.items():
        sc = sum(1.0 / (k_const + rk) for rk in rks)
        scores.append((_id, sc))
    scores.sort(key=lambda x: x[1], reverse=True)
    return [{"id": _id, "score": sc, "metadata": meta.get(_id, {})} for _id, sc in scores[:top_k]]

## 9) Re-rank (Cross-Encoder): BGE Reranker

In [None]:
from FlagEmbedding import FlagReranker

reranker = FlagReranker("BAAI/bge-reranker-v2-m3", use_fp16=use_fp16)

def rerank_ce(query: str, candidates: List[Dict], top_k=K):
    pairs, id2meta = [], {}
    for m in candidates:
        if hasattr(m, "id"):
            _id, meta = m.id, m.metadata
        else:
            _id, meta = m["id"], m.get("metadata", {})
        title = meta.get("title","")
        pairs.append([query, title])
        id2meta[_id] = meta
    scores = reranker.compute_score(pairs)
    items = [{"id": _id, "score": float(sc), "metadata": id2meta[_id]} for (_id, sc) in zip(id2meta.keys(), scores)]
    items.sort(key=lambda x: x["score"], reverse=True)
    return items[:top_k]

## 10) 생성(Answer): Ollama + 근거 컨텍스트 주입

In [None]:
def collect_context(doc_ids: List[str], max_chars: int = 3000) -> str:
    parts, total = [], 0
    for did in doc_ids:
        t = ID2TITLE.get(did, "")
        x = ID2TEXT.get(did, "")
        snippet = (x[:700] + "...") if len(x) > 700 else x
        block = f"[{did}] {t}\n{snippet}"
        if total + len(block) > max_chars:
            break
        parts.append(block)
        total += len(block)
    return "\n\n".join(parts)

def answer_with_ollama(query: str, topk_items: List[Dict], model: str = None):
    model = model or os.getenv("OLLAMA_MODEL", "llama3.1:8b-instruct")
    doc_ids = [m["id"] if isinstance(m, dict) else m.id for m in topk_items]
    ctx = collect_context(doc_ids, max_chars=3000)

    system = (
        "당신은 한국어 RAG 어시스턴트입니다. 아래 '근거 컨텍스트'에 포함된 내용만 사용하여 간결하고 정확하게 답하세요. "
        "확실하지 않으면 모른다고 말하고 추가 정보를 요청하세요. "
        "필요하면 문서 ID로 각 근거를 표기하세요."
    )
    user = f"질문: {query}\n\n[근거 컨텍스트]\n{ctx}"
    messages = [{"role":"system","content":system},{"role":"user","content":user}]
    text, dt = chat_ollama(model=model, messages=messages, stream=False)
    return text, dt, ctx

## 11) 데모: 질의 → Hybrid+CE → Ollama

In [None]:
demo_queries = [q for q in queries_ds["query"][:10] if len(q) > 5][:3]

for q in demo_queries:
    print("="*100)
    print("질문:", q)
    cand = hybrid_weighted_search(q, top_k=N, alpha=ALPHA)
    topk = rerank_ce(q, cand, top_k=K)
    try:
        answer, dt, ctx = answer_with_ollama(q, topk)
        print("[생성 소요]", f"{dt:.2f}s")
        print("[답변]\n", answer)
        print("\n[근거 컨텍스트]\n", ctx[:1000], "...")
    except Exception as e:
        print("Ollama 호출 실패:", e)
        print("→ OLLAMA_HOST 설정 및 서버 실행 여부를 확인하세요.")