In [1]:
import os
import re
import json
import yaml
import sys

def parse_k8s_doc(file_path: str) -> dict:
    """
    Parse a Kubernetes doc file into a JSON-friendly dict.
    YAML front matter is optional.
    """
    with open(file_path, "r", encoding="utf-8") as f:
        content = f.read()

    metadata = {}
    body = content

    # Detect YAML front matter
    parts = re.split(r"^---\s*$", content, flags=re.MULTILINE)
    if len(parts) >= 3:
        yaml_block = parts[1]
        body = parts[2]
        try:
            metadata = yaml.safe_load(yaml_block) or {}
        except Exception:
            metadata = {}

    # Clean markdown body (remove Hugo shortcodes like {{< ... >}})
    cleaned_body = re.sub(r"\{\{<.*?>\}\}", "", body)
    cleaned_body = re.sub(r"\{\{%.*?%\}\}", "", cleaned_body)
    cleaned_body = re.sub(r"\{\{<\s*/.*?>\}\}", "", cleaned_body)
    cleaned_body = cleaned_body.strip()

    # Build structured JSON
    doc_json = {
        "title": metadata.get("title") or os.path.basename(file_path),
        "text": cleaned_body,
        "source_file": os.path.basename(file_path),
    }

    return doc_json


def ingest_directory(input_dir: str, output_file: str):
    """
    Ingest all Kubernetes doc files in a directory and write them into one JSON file.
    """
    all_docs = []

    for root, _, files in os.walk(input_dir):
        for file in files:
            if file.endswith((".md", ".markdown", ".txt")):
                try:
                    doc = parse_k8s_doc(os.path.join(root, file))
                    all_docs.append(doc)
                except Exception as e:
                    print(f"⚠️ Skipping {file}: {e}")

    with open(output_file, "w", encoding="utf-8") as f:
        json.dump(all_docs, f, indent=2, ensure_ascii=False)

    print(f"✅ Ingested {len(all_docs)} documents into {output_file}")


def running_in_jupyter() -> bool:
    """Check if the script is running inside a Jupyter notebook."""
    try:
        from IPython import get_ipython
        if "IPKernelApp" in get_ipython().config:
            return True
    except Exception:
        return False
    return False


if __name__ == "__main__":
    if running_in_jupyter():
        # Default paths for notebook runs
        input_dir = "../../glossary/"
        output_file = "../data/k8s_docs.json"
        ingest_directory(input_dir, output_file)
    else:
        import argparse

        parser = argparse.ArgumentParser(description="Ingest Kubernetes docs into JSON")
        parser.add_argument("--input_dir", required=True, help="../../glossary/")
        parser.add_argument("--output_file", required=True, help="/Users/sindhu/Documents/llm-zoomcamp/KubeRAG/k8s-assistant/data/")

        args = parser.parse_args()
        ingest_directory(args.input_dir, args.output_file)


✅ Ingested 159 documents into ../data/k8s_docs.json


In [2]:
import json
import re
from langchain.text_splitter import RecursiveCharacterTextSplitter

# Configure text splitter (approx. ~800 chars per chunk, with overlap for context)
splitter = RecursiveCharacterTextSplitter(
    chunk_size=800,
    chunk_overlap=100,
    separators=["\n\n", "\n", " ", ""]
)

def transform_doc(doc: dict) -> dict:
    """Clean up one doc: drop unused fields and chunk body."""
    body = doc.get("body", "")

    # Clean out Hugo/shortcode tags (defensive, in case not cleaned before)
    cleaned_body = re.sub(r"\{\{<.*?>\}\}", "", body)
    cleaned_body = re.sub(r"\{\{%.*?%\}\}", "", cleaned_body)
    cleaned_body = re.sub(r"\{\{<\s*/.*?>\}\}", "", cleaned_body)
    cleaned_body = cleaned_body.strip()

    # Split into chunks
    chunks = splitter.split_text(cleaned_body) if cleaned_body else []

    return {
        "title": doc.get("title", ""),
        "content_type": doc.get("content_type", ""),
        "source_file": doc.get("source_file", ""),
        "weight": doc.get("weight", ""),
        "chunks": chunks
    }

def transform_json(input_file: str, output_file: str):
    """Read JSON, transform all docs, and save new JSON."""
    with open(input_file, "r", encoding="utf-8") as f:
        docs = json.load(f)

    transformed = [transform_doc(doc) for doc in docs]

    with open(output_file, "w", encoding="utf-8") as f:
        json.dump(transformed, f, indent=2, ensure_ascii=False)

    print(f"✅ Transformed {len(transformed)} docs into {output_file}")


if __name__ == "__main__":
    # Default paths for notebook runs
    input_dir = "../data/k8s_docs.json"
    output_file = "../data/k8s_docs_parsed.json"
    transform_json(input_dir, output_file)

✅ Transformed 159 docs into ../data/k8s_docs_parsed.json


In [3]:
import os
import re
import json
import yaml

def parse_markdown_file(file_path):
    with open(file_path, "r", encoding="utf-8") as f:
        content = f.read()

    # Extract YAML frontmatter between ---
    frontmatter_match = re.search(r"---\n(.*?)\n---", content, re.DOTALL)
    if not frontmatter_match:
        return None
    frontmatter = yaml.safe_load(frontmatter_match.group(1))

    # Extract synopsis section (everything after "## synopsis" until next "##")
    synopsis_match = re.search(
        r"##.*?synopsis.*?\n(.*?)(?=\n##|\Z)", content, re.DOTALL | re.IGNORECASE
    )
    synopsis = synopsis_match.group(1).strip() if synopsis_match else ""

    # Extract examples section (everything after "## examples" until next "##")
    examples_match = re.search(
        r"##.*?examples.*?\n(.*?)(?=\n##|\Z)", content, re.DOTALL | re.IGNORECASE
    )
    examples = examples_match.group(1).strip() if examples_match else ""

    # Combine the useful content
    combined_text = synopsis + "\n\n" + examples if examples else synopsis

    return {
        "title": frontmatter.get("title"),
        "text": combined_text.strip(),
        "source_file": os.path.basename(file_path),
    }


def convert_directory_to_json(input_dir, output_file):
    docs = []
    for root, _, files in os.walk(input_dir):  # walk recursively
        for file_name in files:
            if file_name.endswith(".md"):
                file_path = os.path.join(root, file_name)
                doc = parse_markdown_file(file_path)
                if doc:
                    docs.append(doc)

    with open(output_file, "w", encoding="utf-8") as f:
        json.dump(docs, f, indent=2, ensure_ascii=False)


# Example usage:
convert_directory_to_json("../../kubectl/", "../data/kubernetes_docs.json")


Concatenate 2 jsons

In [4]:
import json

def concat_json_files(file1, file2, output_file):
    with open(file1, "r", encoding="utf-8") as f1, open(file2, "r", encoding="utf-8") as f2:
        data1 = json.load(f1)
        data2 = json.load(f2)

    # Concatenate lists
    combined = data1 + data2

    with open(output_file, "w", encoding="utf-8") as out:
        json.dump(combined, out, indent=2, ensure_ascii=False)

# Example usage:
concat_json_files("../data/k8s_docs.json", "../data/kubernetes_docs.json", "../data/kubernetes_docs_combined.json")
