In [0]:
%pip install openai
%pip install aiohttp
%pip install pymilvus
%pip install PyGithub
%pip install mmh3

In [0]:
%restart_python

In [0]:
import openai
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType, utility
import numpy as np
import logging
import uuid
import json
import httpx
import requests
from github import Github
import mmh3
import datetime
from typing import List
from dotenv import load_dotenv
import os

In [0]:
dbutils.widgets.text("repo", "", "GitHub Repo")
repo = dbutils.widgets.get("repo")

In [0]:
load_dotenv()
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
GITHUB_PAT = os.getenv('GITHUB_PAT')
ZILLIZ_CLOUD_URI = os.getenv("ZILLIZ_CLOUD_URI")
ZILLIZ_API_KEY = os.getenv("ZILLIZ_API_KEY")

# OpenAI Model Configuration
OPENAI_EMBEDDING_MODEL = "text-embedding-3-small"
OPENAI_CHAT_MODEL = "gpt-4.1-mini"
EMBEDDING_DIMENSION = 1536

In [0]:
MILVUS_COLLECTION_NAME = "github_dense_index"
MILVUS_GITHUB_SPARSE_COLLECTION="github_sparse_index"

# Global variables
collection = None
collection_github_sparse = None


In [0]:
def get_openai_client():
    """Get OpenAI client with error handling"""
    if client is None:
        raise Exception("OpenAI API key not configured")
    return client


In [0]:
# Milvus setup
def setup_milvus_collection(collection_name):
    
    # Check if collection exists
    if utility.has_collection(collection_name):
        collection = Collection(collection_name)
        print(f"Connected to existing collection: {collection_name}")
        return collection
    
    # Create collection schema
    fields = [
        FieldSchema(name="id", dtype=DataType.VARCHAR, max_length=100, is_primary=True),
        FieldSchema(name="repo", dtype=DataType.VARCHAR, max_length=200),
        FieldSchema(name="title", dtype=DataType.VARCHAR, max_length=500),
        FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=20000),
        FieldSchema(name="chunk", dtype=DataType.INT64),
        FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=EMBEDDING_DIMENSION)  # OpenAI text-embedding-3-small dimension
    ]
    
    schema = CollectionSchema(fields=fields, description="Knowledge base for RAG")
    collection = Collection(name=collection_name, schema=schema)
    
    # Create index for vector search
    index_params = {
        "metric_type": "COSINE",
        "index_type": "IVF_FLAT",
        "params": {"nlist": 128}
    }
    collection.create_index(field_name="embedding", index_params=index_params)
    
    print(f"Created new collection: {collection_name}")
    return collection

In [0]:
def setup_github_sparse_collection():
    """Create or load a separate sparse collection for GitHub-only data (BM25-like)."""
    collection_name = MILVUS_GITHUB_SPARSE_COLLECTION

    if utility.has_collection(collection_name):
        col = Collection(collection_name)
        print(f"Connected to existing sparse collection: {collection_name}")
    else:
        fields = [
            FieldSchema(name="id", dtype=DataType.VARCHAR, max_length=200, is_primary=True),
            FieldSchema(name="repo", dtype=DataType.VARCHAR, max_length=200),
            FieldSchema(name="path", dtype=DataType.VARCHAR, max_length=1024),
            FieldSchema(name="chunk", dtype=DataType.INT64),
            FieldSchema(name="sparse_emb", dtype=DataType.SPARSE_FLOAT_VECTOR),
        ]
        schema = CollectionSchema(fields=fields, description="Sparse (BM25-like) index for GitHub files")
        col = Collection(name=collection_name, schema=schema)
        # Create sparse index
        index_params = {
            "metric_type": "IP",
            "index_type": "SPARSE_INVERTED_INDEX",
            "params": {}
        }
        col.create_index(field_name="sparse_emb", index_params=index_params)
        print(f"Created new sparse collection: {collection_name}")
    return col


In [0]:
def _fetch_github_files(repo_full_name: str) -> List[dict]:
    token = GITHUB_PAT
    if not token:
        raise Exception("GITHUB_PAT not configured on server")
    try:
        g = Github(token)
        repo = g.get_repo(repo_full_name)
        def walk(path: str = "") -> List[dict]:
            acc = []
            items = repo.get_contents(path)
            for item in items:
                if item.type == 'dir':
                    acc.extend(walk(item.path))
                else:
                    if item.path.endswith((".py", ".md", ".sql", ".yml")):
                        acc.append({"path": item.path, "content": str(item.decoded_content, 'utf-8', errors='ignore')})
            return acc
        return walk("")
    except Exception as e:
        raise (f"GitHub access error: {e}")

In [0]:
# Embedding functions
async def get_embedding(text: str) -> List[float]:
    """Generate embedding for text using OpenAI API"""
    try:
        # Get OpenAI client
        openai_client = get_openai_client()
        
        # Clean the text by removing newlines and extra whitespace
        cleaned_text = text.replace("\n", " ").strip()
        
        # Create embedding using the OpenAI client
        response = await openai_client.embeddings.create(
            model=OPENAI_EMBEDDING_MODEL,
            input=cleaned_text
        )
        return response.data[0].embedding
    except Exception as e:
        raise Exception(f"Error generating embedding: {str(e)}")

def chunk_text_with_overlap(text: str, chunk_size: int = 2000, overlap: int = 200) -> List[str]:
    chunks = []
    start = 0
    while start < len(text):
        end = min(start + chunk_size, len(text))
        chunks.append(text[start:end])
        if end == len(text):
            break
        start = end - overlap
    return chunks

In [0]:
def _tokenize(text: str) -> List[str]:
    import re
    tokens = re.findall(r"[A-Za-z0-9_]+", text.lower())
    # basic filtering
    return [t for t in tokens if len(t) >= 2]

def _compute_sparse_embedding(text: str) -> dict:
    """Compute a simple TF-weighted sparse embedding mapped by hashed term ids."""
    from collections import Counter
    terms = _tokenize(text)
    if not terms:
        return {}
    counts = Counter(terms)
    sparse = {}
    for term, tf in counts.items():
        idx = mmh3.hash(term, signed=False) % 1000000  # hash to a large dim space
        weight = 1.0 + (tf ** 0.5)  # sublinear tf
        sparse[idx] = weight
    # Sort by index
    return dict(sorted(sparse.items()))

def chunk_text(text: str, max_len: int = 20000) -> List[str]:
    return [text[i:i+max_len] for i in range(0, len(text), max_len)]

In [0]:
async def ingest_github(repo: str, collection=None, collection_github_sparse=None):
    
    table_name = "tabular.dataexpert.mlivshutz54984_vector_insert_log"

    # Use existing Milvus collection
    if collection is None:
        raise Exception("Milvus collection not initialized")
    if collection_github_sparse is None:
        raise Exception("Milvus GitHub sparse collection not initialized")
    print(f"[{datetime.datetime.now().isoformat()}] Starting ingestion for repo: {repo} into dense index: {collection.name} and sparse index: {collection_github_sparse.name}")
    # Upsert via Milvus SDK for this app
    files = _fetch_github_files(repo)
    count = 0
    for f in files:
        text = f.get("content", "")
        if not text:
            continue
        chunks = chunk_text_with_overlap(text)
        for chunk_idx, chunk in enumerate(chunks):
            try:
                # Dense insert
                emb = await get_embedding(chunk)
                chunk_id = str(mmh3.hash128(repo + '/' + f["path"] + f"_chunk_{chunk_idx}", signed=False))
                data = [
                    [chunk_id],                    # id
                    [repo],                    # repo
                    [f["path"]],                   # title (without chunk number)
                    [chunk],                       # content
                    [chunk_idx],                   # chunk number
                    [emb]                          # embedding
                ]
                collection.insert(data)
                
                # Sparse insert for the same chunk
                sparse = _compute_sparse_embedding(chunk)
                if sparse:
                    sparse_data = [
                        [chunk_id],
                        [repo],
                        [f["path"]],                   # path (without chunk number)
                        [chunk_idx],                   # chunk number
                        [sparse]
                    ]
                    collection_github_sparse.insert(sparse_data)
                
                count += 1
            except Exception as e:
                print(f"Failed to create a dense or a sparse vector index: Skipping {f.get('path')} chunk {chunk_idx}: {e}")
                continue
        end_time = datetime.datetime.now().isoformat()
        log_list = [{
            "files_ingested": count,
            "vector_index_type": "dense",
            "collection": collection.name,
            "created_at": end_time,
            "repo": repo,
            "file_name": f.get("path", "")
        },
        {
            "files_ingested": count,
            "vector_index_type": "sparse",
            "sparse_collection": collection_github_sparse.name,
            "created_at": end_time,
            "repo": repo,
            "file_name": f.get("path", "")
        }]
        spark.createDataFrame(log_list).write.mode("append").saveAsTable(table_name)
        spark.sql(f"alter table {table_name} set TBLPROPERTIES ('delta.enableChangeDataFeed' = true)")

    
    collection.load()
    collection_github_sparse.load()
    print(f"[{end_time}] Ingestion completed for repo: {repo}. Files ingested: {count}")
    

    return

In [0]:
if OPENAI_API_KEY:
    openai.api_key = OPENAI_API_KEY
    # Explicitly create an httpx client without proxies
    async_http_client = httpx.AsyncClient()
    # async_http_client = httpx.AsyncClient(proxies=None)
    client = openai.AsyncOpenAI(api_key=OPENAI_API_KEY, http_client=async_http_client)
else:
    client = None

In [0]:
connections.connect(
    alias="default",
    uri=ZILLIZ_CLOUD_URI,
    token=ZILLIZ_API_KEY,
    secure=True
)
print("Successfully connected to Zilliz Cloud!")

In [0]:
# Initialize collection
collection = setup_milvus_collection(MILVUS_COLLECTION_NAME)

In [0]:
collection_github_sparse = setup_github_sparse_collection()


In [0]:
# Ingest GitHub repo
await ingest_github(repo, collection, collection_github_sparse)
