#### Load the articles saved data from the drive.

In [1]:
from google.colab import drive

drive.mount('/content/MyDrive')

Drive already mounted at /content/MyDrive; to attempt to forcibly remount, call drive.mount("/content/MyDrive", force_remount=True).


In [2]:
import json
from langchain.docstore.document import Document


# --- Loading the documents from JSONL (when you're ready to work on RAG) ---

file_path_jsonl = "/content/MyDrive/MyDrive/Customer Service Chatbot Assistance/knowledge_base_articles.jsonl"
loaded_documents = []
with open(file_path_jsonl, 'r', encoding='utf-8') as f:
    for line in f:
        data = json.loads(line.strip()) # .strip() removes leading/trailing whitespace, including the newline
        # Reconstruct the LangChain Document object
        doc = Document(
            page_content=data["page_content"],
            metadata=data["metadata"]
        )
        loaded_documents.append(doc)

print(f"\nLoaded {len(loaded_documents)} documents from {file_path_jsonl}")
print("First loaded document example:")
print(loaded_documents[0])
print(f"Type of loaded document: {type(loaded_documents[0])}")

# You can verify that the content and metadata are correctly loaded
# print(loaded_documents[0].page_content)
# print(loaded_documents[0].metadata['title'])


Loaded 296 documents from /content/MyDrive/MyDrive/Customer Service Chatbot Assistance/knowledge_base_articles.jsonl
First loaded document example:
page_content='Many users including ourselves, are having problems installing adobe flash with firefox, it keeps displaying the security errors from Internet explorer although you're installing it for firefox, correct? well the simple and quickiest solution is to use this full installer instead on your windows VPS or dedicated server:http://download.macromedia.com/pub/flashplayer/latest/help/install_flash_player.exe' metadata={'title': 'Adobe flash Internet explorer security error', 'source': 'https://clients.hostsailor.com/index.php?rp=/knowledgebase/130/Adobe-flash-Internet-explorer-security-error.html'}
Type of loaded document: <class 'langchain_core.documents.base.Document'>


In [4]:
loaded_documents[:15]

[Document(metadata={'title': 'Adobe flash Internet explorer security error', 'source': 'https://clients.hostsailor.com/index.php?rp=/knowledgebase/130/Adobe-flash-Internet-explorer-security-error.html'}, page_content="Many users including ourselves, are having problems installing adobe flash with firefox, it keeps displaying the security errors from Internet explorer although you're installing it for firefox, correct? well the simple and quickiest solution is to use this full installer instead on your windows VPS or dedicated server:http://download.macromedia.com/pub/flashplayer/latest/help/install_flash_player.exe"),
 Document(metadata={'title': 'Disk space upgrade', 'source': 'https://clients.hostsailor.com/index.php?rp=/knowledgebase/129/Disk-space-upgrade.html'}, page_content="Once you upgrade your VPS the RAM and CPU are easily upgradable, with the disk it's a different matter:With OpenVZ your disk space will be upgraded using the same partition.With XEN Linux we can also upgrade 

### cleaning the dataset.
- remove the html tags.
- add the links inside articles to metadata and remove it from the article itself.

In [5]:
!cp "/content/MyDrive/MyDrive/Customer Service Chatbot Assistance/knowledge_base_articles.jsonl" knowledge_base_articles.jsonl

#### Adding the `type` in metadata for the `FAQ` or `Tutorial`

In [6]:
def tag_documents_with_type(documents):
    """
    Adds a 'type' field to each document's metadata.
    First 9 documents are tagged as 'FAQ', the rest as 'Tutorial'.

    Parameters:
        documents (list): List of Document objects.

    Returns:
        list: Modified list with updated metadata.
    """
    for i, doc in enumerate(documents):
        doc.metadata['type'] = 'FAQ' if i < 9 else 'Tutorial'
    return documents


add_type_docs = tag_documents_with_type(loaded_documents)

In [7]:
len(add_type_docs)

296

In [8]:
add_type_docs[:15]

[Document(metadata={'title': 'Adobe flash Internet explorer security error', 'source': 'https://clients.hostsailor.com/index.php?rp=/knowledgebase/130/Adobe-flash-Internet-explorer-security-error.html', 'type': 'FAQ'}, page_content="Many users including ourselves, are having problems installing adobe flash with firefox, it keeps displaying the security errors from Internet explorer although you're installing it for firefox, correct? well the simple and quickiest solution is to use this full installer instead on your windows VPS or dedicated server:http://download.macromedia.com/pub/flashplayer/latest/help/install_flash_player.exe"),
 Document(metadata={'title': 'Disk space upgrade', 'source': 'https://clients.hostsailor.com/index.php?rp=/knowledgebase/129/Disk-space-upgrade.html', 'type': 'FAQ'}, page_content="Once you upgrade your VPS the RAM and CPU are easily upgradable, with the disk it's a different matter:With OpenVZ your disk space will be upgraded using the same partition.With 

#### save the `add_type_docs` to a `.jsonl` file

In [122]:
# saving the cleaned version of the knowledge base

import json
from langchain.docstore.document import Document

print(f"Initial documents to save: {len(add_type_docs)}")

# --- Saving the documents to JSONL ---
file_path_jsonl = "meatadata_type_kb_articles.jsonl" # A descriptive filename
with open(file_path_jsonl, 'w', encoding='utf-8') as f:
    for doc in add_type_docs:
        # LangChain Document objects have .page_content and .metadata attributes
        doc_dict = {
            "page_content": doc.page_content,
            "metadata": doc.metadata
        }
        # Use ensure_ascii=False to correctly handle non-ASCII characters (e.g., Arabic)
        f.write(json.dumps(doc_dict, ensure_ascii=False) + '\n')
print(f"Documents successfully saved to {file_path_jsonl}")

Initial documents to save: 296
Documents successfully saved to cleaned_kb_articles.jsonl


#### Text Processing and Chunking.
- detect the links in the articles, remove them from the article and add them to the `metadata` as `reference`
- clean the documents
- split it into chunks
- add representative tags to metadata according to keywords in the text itself

In [9]:
import re
import html
import uuid
from typing import List, Dict
from urllib.parse import urlparse
from langchain.schema import Document

# Pattern to extract full URLs
FULL_URL_PATTERN = re.compile(r'https?://[^\s\'"<>]+')

def get_base_domain(url: str) -> str:
    """Extracts the clean base domain from a URL."""
    try:
        netloc = urlparse(url).netloc
        return netloc.replace('www.', '') if netloc else ''
    except ValueError:
        return ''

def clean_and_extract_urls(doc: Document) -> Document:
    """
    Removes HTML, extracts URLs, classifies internal vs external links.
    Adds references to metadata.
    """
    original_content = doc.page_content

    # Ensure references list exists
    if 'references' not in doc.metadata or not isinstance(doc.metadata['references'], list):
        doc.metadata['references'] = []

    base_domain = get_base_domain(doc.metadata.get('source', ''))

    # Step 1: Extract full URLs
    full_urls = list(set(FULL_URL_PATTERN.findall(original_content)))
    doc.metadata['references'].extend(full_urls)

    # Step 2: Clean content
    cleaned_content = re.sub(r'<[^>]+>', '', original_content)
    cleaned_content = ' '.join(cleaned_content.split())

    # Step 3: Replace/remove links
    for url in full_urls:
        if base_domain and get_base_domain(url) == base_domain:
            cleaned_content = cleaned_content.replace(url, "[our client area link]")
        else:
            cleaned_content = cleaned_content.replace(url, "")

    # Step 4: Remove broken paths
    broken_path_patterns = [
        r'/[a-zA-Z0-9_\-\./]*(?:\.html|\.php|\.exe|\.zip|\.tar\.gz|\.pdf|\.txt|\.gpg)?(?:\?[\w=&.\-]*)*',
        r'clientarea\.php(?:\?[\w=&.\-]*)*',
        r'index\.php(?:\?[\w=&.\-]*)*'
    ]
    for pattern in broken_path_patterns:
        cleaned_content = re.sub(r'(?<!\w)' + pattern, '', cleaned_content)

    doc.page_content = cleaned_content
    return doc

def clean_document(doc: Document) -> Document:
    """
    Cleans metadata and content.
    Unescapes title and removes boilerplate from content.
    """
    cleaned_metadata = doc.metadata.copy()
    cleaned_content = doc.page_content

    # Clean title
    if 'title' in cleaned_metadata:
        cleaned_metadata['title'] = html.unescape(cleaned_metadata['title'])

    # Clean content
    cleaned_content = cleaned_content.replace('\xa0', ' ')
    cleaned_content = cleaned_content.replace('\r\n', '\n')
    cleaned_content = re.sub(r"you can (view|watch) the video (kb|tutorial) for this topic by clicking here\.?", '', cleaned_content, flags=re.IGNORECASE)
    cleaned_content = re.sub(r'\s+', ' ', cleaned_content).strip()

    return Document(page_content=cleaned_content, metadata=cleaned_metadata)

# === Existing processing functions ===

def clean_text(text: str) -> str:
    text = re.sub(r"\s+", " ", text)
    text = re.sub(r"\\n", "\n", text)
    text = re.sub(r"<[^>]+>", "", text)
    return text.strip()

def split_into_chunks(text: str) -> List[str]:
    chunks = re.split(r"(?i)(step\s+\d+|^#+\s+|\n(?=\S))", text)
    merged_chunks = []
    i = 0
    while i < len(chunks):
        if re.match(r"(?i)^step\s+\d+", chunks[i]):
            heading = chunks[i].strip()
            content = chunks[i + 1].strip() if i + 1 < len(chunks) else ""
            merged_chunks.append(f"{heading}\n{content}")
            i += 2
        else:
            merged_chunks.append(chunks[i].strip())
            i += 1
    return [chunk for chunk in merged_chunks if len(chunk.split()) > 10]

def extract_commands(text: str) -> List[str]:
    return re.findall(r"(?m)^(sudo|docker|apt-get|curl)[^\n]+", text)

def tag_chunk(text: str) -> List[str]:
    tags = []
    text_lower = text.lower()
    if "docker" in text_lower: tags.append("docker")
    if any(k in text_lower for k in ["ubuntu", "debian", "linux"]): tags.append("linux")
    if "ubuntu" in text_lower: tags.append("ubuntu")
    if "debian" in text_lower: tags.append("debian")
    if "ssl" in text_lower: tags.append("ssl")
    if "isp manager" in text_lower: tags.append("isp-manager")
    if "vps" in text_lower: tags.append("vps")
    if "ip" in text_lower: tags.append("ip")
    if "ffmpeg" in text_lower: tags.append("ffmpeg")
    if "centos" in text_lower: tags.append("centos")
    if "ddos" in text_lower: tags.append("ddos")
    if "windows" in text_lower: tags.append("windows")
    if "php" in text_lower: tags.append("php")
    if "web" in text_lower: tags.append("web")
    if "email" in text_lower: tags.append("email")
    if "mac" in text_lower: tags.append("mac")
    if "mysql" in text_lower: tags.append("mysql")
    if "wordpress" in text_lower: tags.append("wordpress")
    if "error" in text_lower: tags.append("error")
    if "ssh" in text_lower: tags.append("ssh")
    if "mongodb" in text_lower: tags.append("mongodb")
    if "openvz" in text_lower: tags.append("openvz")
    if "xenpv" in text_lower: tags.append("xenpv")
    if "kvm" in text_lower: tags.append("kvm")
    if not tags: tags.append("other")
    return tags

# === Main integration function ===

def preprocess_document(doc: Document) -> List[Dict]:
    # Step 1: Clean metadata and remove noise
    doc = clean_document(doc)

    # Step 2: Extract and clean links, update metadata
    doc = clean_and_extract_urls(doc)

    # Step 3: Clean additional formatting
    cleaned = clean_text(doc.page_content)

    # Step 4: Split and tag chunks
    chunks = split_into_chunks(cleaned)
    processed_chunks = []
    for i, chunk in enumerate(chunks):
        chunk_data = {
            "id": str(uuid.uuid4()),
            "content": chunk,
            "source": doc.metadata.get("source"),
            "title": doc.metadata.get("title"),
            "type": doc.metadata.get("type"),
            "chunk_index": i,
            "commands": extract_commands(chunk),
            "tags": tag_chunk(chunk),
            "references": doc.metadata.get("references", []),
        }
        processed_chunks.append(chunk_data)

    return processed_chunks

def preprocess_all(documents: List[Document]) -> List[Dict]:
    all_chunks = []
    for doc in documents:
        all_chunks.extend(preprocess_document(doc))
    return all_chunks

In [10]:
processed_chunks = preprocess_all(add_type_docs)

In [11]:
len(processed_chunks)

339

In [15]:
processed_chunks[:22]

[{'id': '649fbecd-0f97-4d02-a205-99967d8c6c36',
  'content': "Many users including ourselves, are having problems installing adobe flash with firefox, it keeps displaying the security errors from Internet explorer although you're installing it for firefox, correct? well the simple and quickiest solution is to use this full installer instead on your windows VPS or dedicated server:",
  'source': 'https://clients.hostsailor.com/index.php?rp=/knowledgebase/130/Adobe-flash-Internet-explorer-security-error.html',
  'title': 'Adobe flash Internet explorer security error',
  'type': 'FAQ',
  'chunk_index': 0,
  'commands': [],
  'tags': ['vps', 'windows', 'error'],
  'references': ['http://download.macromedia.com/pub/flashplayer/latest/help/install_flash_player.exe']},
 {'id': '335632b2-b23c-42f3-baf8-2d114bf05f9e',
  'content': "Once you upgrade your VPS the RAM and CPU are easily upgradable, with the disk it's a different matter:With OpenVZ your disk space will be upgraded using the same pa

### save the data into `.json` file for further use.

In [13]:
import json

with open("preprocessed_chunks.json", "w", encoding="utf-8") as f:
    json.dump(processed_chunks, f, ensure_ascii=False, indent=2)