In [1]:
import json
import logging

from pathlib import Path
from typing import Any, Dict, List

from openai import OpenAI
from qdrant_client import QdrantClient
from qdrant_client.http import models
from qdrant_client.http.models import SparseVectorParams, Modifier
from qdrant_client.models import Distance, VectorParams
from dotenv import load_dotenv
from fastembed import SparseTextEmbedding

from src.config.settings import Config

load_dotenv()
openai_client = OpenAI()
config = Config()

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler())

# prefer_grpc is set True to avoid timeout error
client = QdrantClient(
    host=config.qdrant_host,
    port=config.qdrant_port,
    # prefer_grpc=True
)

bm25_embedding_model = SparseTextEmbedding(config.sparse_embedding_model)


def get_embedding(text: str) -> List[float]:
    """Generate embedding vector from OpenAI."""
    try:
        response = openai_client.embeddings.create(
            model= config.embedding_model,
            input=text
        )
        return response.data[0].embedding
    except Exception as e:
        print(f"Error getting embedding: {e}")
        return [0] * 1536  # Return zero vector on error


def reformat_for_embedding(entry: dict) -> str:
    """
    Reformat a single JSON entry into a single string for embedding.
    """
    parts = []

    # Include grammar names if available
    if "grammar_name_kr" in entry:
        parts.append(f"НАЗВАНИЕ НА КОРЕЙСКОМ: {entry['grammar_name_kr']}")
    if "grammar_name_rus" in entry:
        parts.append(f"НАЗВАНИЕ НА РУССКОМ: {entry['grammar_name_rus']}")

    # Include level information (optional)
    level_mapping = {
        1: "Начинающий",
        2: "Базовый",
        3: "Средний",
        4: "Выше среднего",
        5: "Продвинутый",
        6: "Экспертный"
    }

    if "level" in entry:
        level_value = entry.get("level")
        level_name = level_mapping.get(level_value, f"Level {level_value}")
        parts.append(f"Level: {level_name} ({level_value})")

    # Append description
    if "description" in entry and entry["description"]:
        parts.append(f"ОПИСАНИЕ: {entry['description']}")

    # Append usage form
    if "usage_form" in entry and entry["usage_form"]:
        parts.append(f"ФОРМА: {entry['usage_form']}")

    # Append examples
    if "examples" in entry and entry["examples"]:
        for idx, example in enumerate(entry["examples"], start=1):
            korean = example.get("korean", "")
            russian = example.get("russian", "")
            parts.append(f"ПРИМЕР {idx}: НА КОРЕЙСКОМ: {korean} | НА РУССКОМ: {russian}")

    # Append notes
    if "notes" in entry and entry["notes"]:
        # Join notes with a semicolon for clarity
        notes_combined = "; ".join(entry["notes"])
        parts.append(f"ПРИМЕЧАНИЯ: {notes_combined}")

    # Combine all parts into one final string separated by newlines
    return "\n".join(parts)



def load_json_entries(dir_path: str) -> List[Dict[str, Any]]:
    """Load all JSON grammar entries from a directory."""
    entries = []
    path = Path(dir_path)

    # If path is a file, and it's a combined JSON file
    if path.is_file() and path.name.endswith('.json'):
        with open(path, 'r', encoding='utf-8') as f:
            data = json.load(f)
            if isinstance(data, list):
                return data
            else:
                return [data]

    return entries

def create_qdrant_collection(collection_name: str = config.qdrant_collection_name) -> None:
    """Create a Qdrant collection if it doesn't exist."""
    # List existing collections
    # Create a collection if it doesn't exist
    if not client.collection_exists(collection_name):
        client.create_collection(
            collection_name=collection_name,
            vectors_config={
                config.embedding_model: VectorParams(
                    size=1536,
                    distance=Distance.COSINE,
                    on_disk=True
                ),
            },
            sparse_vectors_config={
                config.sparse_embedding_model: SparseVectorParams(modifier=Modifier.IDF) # INFO has GRPC version for Modifier
            },
            # INFO Set up a quantization for Droplet due to lack of RAM
            # INFO Check out https://qdrant.tech/documentation/guides/optimize/ for additional information
            # quantization_config=models.ScalarQuantization(
            #     scalar=models.ScalarQuantizationConfig(
            #         type=models.ScalarType.INT8,
            #         always_ram=True,
            #     ),
            # ) if quantization else None
        )
        logger.info(f"Collection {collection_name} created")
    else:
        logger.info(f"Collection {collection_name} already exists")

Create a Qdrant collection and upload grammar entries

In [2]:
# Create collection
create_qdrant_collection()

# Load grammar entries
data_dir = Path("../data/grammar-level-1")
all_entries_file = data_dir / "entries.json"

if all_entries_file.exists():
    entries = load_json_entries(str(all_entries_file))
else:
    print("Please run parse_md_to_json.py first to generate JSON files.")

print(f"{len(entries)} grammar entries to upload")

Collection korean_grammar created


40 grammar entries to upload


Upload entries to Qdrant

In [3]:
# Generate embeddings and create points

points = []
for i, entry in enumerate(entries):

    formatted_entry = reformat_for_embedding(entry)
    vector = get_embedding(formatted_entry)
    sparse_vector = next(bm25_embedding_model.embed(formatted_entry)).as_object()

    points.append(models.PointStruct(
        id=i,
        vector={
            config.embedding_model: vector,
            config.sparse_embedding_model: sparse_vector
        },
        payload=entry
    ))

print(f"Generated {len(points)} points")

Generated 40 points


In [4]:
# Ingest points to the vector database

client.upsert(
    collection_name=config.qdrant_collection_name,
    points=points
)

print(f"Upload complete. {len(points)} entries added to {config.qdrant_collection_name} collection.")
print(f"You can now query the collection using the Qdrant client.")

Upload complete. 40 entries added to korean_grammar collection.
You can now query the collection using the Qdrant client.


Testing search

In [5]:
from typing import Optional, List, Dict
from pydantic import BaseModel
from qdrant_client.models import Prefetch, SparseVector

query = "планирую сделать"
vector_query = get_embedding(query)
sparse_vector_query = next(bm25_embedding_model.query_embed(query))
sparse_vector_query = SparseVector(**sparse_vector_query.as_object())

top_k = 5
threshold = 0

class RetrievedDocs(BaseModel):
    content: str
    metadata: dict
    score: float
    cross_score: Optional[float] = None

# Prefetching using bm25 model
bm_25_prefetch = [
                Prefetch(
                    query=sparse_vector_query,
                    using=config.sparse_embedding_model,
                    limit=top_k,
                    score_threshold=threshold
                )
            ]

Hybrid search

In [11]:
print(f"Performing hybrid search with top_k={top_k}, threshold={threshold}")

# Query vector database
hits = client.query_points(
    collection_name=config.qdrant_collection_name,
    using=config.embedding_model,
    query=vector_query,
    limit=top_k,
    prefetch=bm_25_prefetch,
    score_threshold=threshold,
    with_payload=True
).points

# Convert to schema objects
docs = [
    RetrievedDocs(
        content=hit.payload["description"],
        metadata={k: v for k, v in hit.payload.items() if k != "content"},
        score=hit.score
    ) for hit in hits
]

Performing hybrid search with top_k=5, threshold=0


In [19]:
client.__dict__

{'_inference_inspector': <qdrant_client.embed.type_inspector.Inspector at 0x7f58389e4200>,
 '_embedding_model_name': None,
 '_sparse_embedding_model_name': None,
 '_embed_inspector': <qdrant_client.embed.embed_inspector.InspectorEmbed at 0x7f5839f090d0>,
 '_init_options': {'location': None,
  'url': None,
  'port': 6333,
  'grpc_port': 6334,
  'prefer_grpc': False,
  'https': None,
  'api_key': None,
  'prefix': None,
  'timeout': None,
  'host': 'localhost',
  'path': None,
  'force_disable_check_same_thread': False,
  'grpc_options': None,
  'auth_token_provider': None,
  'cloud_inference': False,
  'check_compatibility': True},
 '_client': <qdrant_client.qdrant_remote.QdrantRemote at 0x7f5837e79fd0>,
 'cloud_inference': False}

In [16]:

[(doc.metadata["grammar_name_rus"], doc.score, doc.cross_score) for doc in docs]

[('окончание намерения «собираться что-то сделать»', 0.29411525, None),
 ('необходимость, обязанность «должен сделать»', 0.24075162, None),
 ('выражение желания «хотеть сделать»', 0.24012865, None),
 ('вопрос о сомнении или приглашении «не сделать ли…?»', 0.20112832, None)]

In [8]:
[(doc.metadata["grammar_name_rus"], doc.score, doc.cross_score) for doc in docs]

[('окончание намерения «собираться что-то сделать»', 0.29411525, None),
 ('необходимость, обязанность «должен сделать»', 0.24075162, None),
 ('выражение желания «хотеть сделать»', 0.24012865, None),
 ('вопрос о сомнении или приглашении «не сделать ли…?»', 0.20112832, None)]

Testing out search without prefetch

In [10]:
print(f"Performing search WITHOUT prefetching with top_k={top_k}, threshold={threshold}")

# Query vector database
hits = client.query_points(
    collection_name=config.qdrant_collection_name,
    using=config.embedding_model,
    query=vector_query,
    limit=top_k,
    # prefetch=bm_25_prefetch,
    score_threshold=threshold,
    with_payload=True
).points

# Convert to schema objects
docs_2 = [
    RetrievedDocs(
        content=hit.payload["description"],
        metadata={k: v for k, v in hit.payload.items() if k != "content"},
        score=hit.score
    ) for hit in hits
]

Performing search WITHOUT prefetching with top_k=5, threshold=0


In [11]:
[(doc.metadata["grammar_name_rus"], doc.score, doc.cross_score) for doc in docs_2]

[('прогрессивная форма «(сейчас) делаю»', 0.2976943, None),
 ('окончание намерения «собираться что-то сделать»', 0.29411525, None),
 ('побудительная форма «давайте сделаем…»', 0.24894285, None),
 ('необходимость, обязанность «должен сделать»', 0.24091646, None),
 ('выражение желания «хотеть сделать»', 0.24012865, None)]

In [12]:
search_result = client.query_points(
    collection_name=config.qdrant_collection_name,
    using=config.sparse_embedding_model,
    query=sparse_vector_query,
    # query_filter=Filter(
    #     must=[FieldCondition(key="level", match=MatchValue(value="1"))]
    # ),
    with_payload=True,
    limit=3,
).points

In [13]:
from sentence_transformers import CrossEncoder

# Prepare input pairs and get scores
cross_input = [[query, doc.content] for doc in docs]
scores = CrossEncoder(config.reranking_model).predict(cross_input)


# Add cross-encoder scores to docs
for idx in range(len(scores)):
    docs[idx].cross_score = scores[idx]
    print(f"Document {idx} reranking: {docs[idx].score:.4f} -> {scores[idx]:.4f}")

# Sort by cross-encoder score
sorted_docs = sorted(docs, key=lambda x: x.cross_score, reverse=True)

Document 0 reranking: 0.2941 -> 7.6691
Document 1 reranking: 0.2409 -> 8.1982
Document 2 reranking: 0.2401 -> 8.7999
Document 3 reranking: 0.2011 -> 7.9227


In [14]:
[(doc.metadata["grammar_name_rus"], doc.score) for doc in sorted_docs]

[('выражение желания «хотеть сделать»', 0.24012865),
 ('необходимость, обязанность «должен сделать»', 0.24091646),
 ('вопрос о сомнении или приглашении «не сделать ли…?»', 0.20112832),
 ('окончание намерения «собираться что-то сделать»', 0.29411525)]

In [15]:
[(doc.metadata["grammar_name_rus"], doc.score) for doc in docs]

[('окончание намерения «собираться что-то сделать»', 0.29411525),
 ('необходимость, обязанность «должен сделать»', 0.24091646),
 ('выражение желания «хотеть сделать»', 0.24012865),
 ('вопрос о сомнении или приглашении «не сделать ли…?»', 0.20112832)]