In [None]:
# Warning control
import warnings
warnings.filterwarnings('ignore')

from IPython.display import display
from IPython.display import clear_output
import json
!pip install nltk

import nltk
from transformers import pipeline

from nltk.corpus import wordnet as wn
nltk.download('wordnet')
nltk.download('omw-1.4')

clear_output()

In [None]:
# Cell 2 Extract documentation texts
from pathlib import Path

# Initialize 
summarizer = pipeline("summarization", model="facebook/bart-large-cnn", device=-1)
classifier = pipeline("zero-shot-classification", model="facebook/bart-large-mnli", device=-1)
DOC_TYPE_LABELS = ["concept", "task", "reference", "overview", "tutorial"]
PERSONA_LABELS = ["API Developer", "Storage Administrator", "DevOps Engineer", "Systems Administrator", "IT Director"]


def extract_texts(repo_path, extensions=("adoc", "json", "yaml", "yml")):
    """
    Extract texts from files with specified extensions in the given repository path.
    """
    texts = []
    
    # Fix: Process all extensions, not just the first one
    for ext in extensions:
        file_paths = list(Path(repo_path).rglob(f"*.{ext}"))
        print(f"Found {len(file_paths)} .{ext} files")
        
        for path in file_paths:
            try:
                with open(path, 'r', encoding='utf-8', errors='ignore') as f:
                    content = f.read()
                    if content.strip():  # Only add non-empty files
                        texts.append((str(path), content, ext))
            except Exception as e:
                print(f"Error reading {path}: {e}")
                continue
    
    return texts

def extract_adoc_texts(repo_path):
    """
    Extract texts specifically from .adoc files in the given repository path.
    """
    adoc_texts = []
    file_paths = list(Path(repo_path).rglob("*.adoc"))
    print(f"Found {len(file_paths)} .adoc files")
    
    for path in file_paths:
        try:
            with open(path, 'r', encoding='utf-8', errors='ignore') as f:
                content = f.read()
                if content.strip():  # Only add non-empty files
                    adoc_texts.append((str(path), content, 'adoc'))
        except Exception as e:
            print(f"Error reading {path}: {e}")
            continue
    
    return adoc_texts

def docs_to_json(docs, filename):
    # Export extracted docs to a JSON file
    try:
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(docs, f, indent=2, ensure_ascii=False)
        print(f"Extracted docs exported to '{filename}.json'")
    except Exception as e:
        print(f"Error exporting extracted '{filename}.json' docs: {e}")

docs = extract_texts("../data/bluexp-dataset")
#print(f"Extracted {len(docs)} files")

#docs_to_json(docs, "cell_2_extracted_docs")
docs_to_json( extract_adoc_texts("../data/bluexp-dataset"), "cell_2_extracted_adoc_docs")



Device set to use cpu
Device set to use cpu


Found 158 .adoc files
Found 16 .json files
Found 1 .yaml files
Found 18 .yml files
Found 158 .adoc files
✅ Extracted docs exported to 'cell_2_extracted_adoc_docs.json'


In [None]:
# Cell 3 metadata 

import os
from datetime import datetime
import re
import hashlib
import uuid
from sklearn.feature_extraction.text import TfidfVectorizer


#CSV Header
# file_name (url)
# file_ext (json, adoc, etc.)
# uuid
# repo_name
# last_modified
# file_purpose_use ("task, concept")
# summary (purpose)
# chapter
# keywords
# persona (business,developer,system admin, overweight)
# synonyms
# section heading


# Extracted
def split_frontmatter(text):
    front, body = {}, text
    if text.startswith('---'):
        parts = text.split('---', 2)
        if len(parts) >= 3:
            front = yaml.safe_load(parts[1]) or {}
            body = parts[2]
    return front, body

# Only for .adoc bodies
def extract_title(body):
    for line in body.splitlines():
        if line.startswith('= '):
            return line.lstrip('= ').strip()
    return None

def extract_headings(body, level=2):
    return re.findall(rf"^{'='*level}\s+(.*)", body, flags=re.MULTILINE)

def extract_prerequisites(body):
    out, cap = [], False
    for line in body.splitlines():
        if re.match(r"^==+\s+Prerequisites", line): cap = True; continue
        if cap:
            if re.match(r"^==+\s+", line): break
            if line.strip(): out.append(line.strip('-* '))
    return out

def estimate_reading_time(body, wpm=200):
    return max(1, len(re.findall(r"\w+", body)) // wpm)

def extract_api_endpoints(body):
    return [f"{m} {p}" for m, p in re.findall(r"\b(GET|POST|PUT|DELETE|PATCH)\s+(/[\w_{}\-\[\]/]+)", body)]
    

def extract_tags_tfidf(corpus, top_n=5):
    idxs = [i for i, doc in enumerate(corpus) if re.search(r"\w+", doc)]
    tags = [[] for _ in corpus]
    if not idxs: return tags
    filtered = [corpus[i] for i in idxs]
    vect = TfidfVectorizer(max_df=0.8, stop_words='english', ngram_range=(1,2))
    X = vect.fit_transform(filtered)
    feats = vect.get_feature_names_out()
    for i, orig in enumerate(idxs):
        arr = X[i].toarray().flatten()
        top = arr.argsort()[::-1][:top_n]
        tags[orig] = [feats[j] for j in top]
    return tags

#Summarizer
def summarize_purpose(text):
    # Ensure the input is a string
    if not isinstance(text, str):
        raise ValueError(f"Expected a string, but got {type(text)}")

    # Extract a snippet of the text
    snippet = text[:1000]

    # Call the summarizer
    out = summarizer(snippet, max_length=60, min_length=10, do_sample=False)

    # Return the summary text
    return out[0]['summary_text'].strip()

# Extract section headings
def extract_section_headings(content):
    """
    Extracts section headings from the given content based on a specific pattern.

    This function identifies lines in the content that start with one or more 
    '=' characters followed by a space and a title. It determines the level of 
    the heading based on the number of '=' characters and returns a list of 
    formatted headings.

    # AsciiDoc Heading Syntax
    = Document Title → Level 0 (already extracted as title)

    == Section A → Level 1

    === Subsection A.1 → Level 2

    ==== Sub-subsection A.1.1 → Level 3

    Args:
        content (str): The input text content from which section headings 
                       are to be extracted.

    Returns:
        list: A list of strings representing the extracted section headings, 
              formatted as "Level {level}: {title}" where {level} is the 
              number of '=' characters and {title} is the heading text.
    """
    headings = []
    for line in content.splitlines():
        match = re.match(r"^(==+)\s+(.*)", line.strip())
        if match:
            level = len(match.group(1))  # Count of '=' characters
            title = match.group(2).strip()
            headings.append(f"Level {level}: {title}")
    return headings

def get_last_modified_datetime(file_path):
    """
    Get the last modified datetime of a file.
    """
    try:
        last_modified_timestamp = os.path.getmtime(file_path)
        return datetime.fromtimestamp(last_modified_timestamp)
    except FileNotFoundError:
        print(f"File not found: {file_path}")
        return None
    except Exception as e:
        print(f"Error retrieving last modified datetime for {file_path}: {e}")
        return None

# Synonym expansion using WordNet
def get_synonyms(word):
    synonyms = []
    seen = set()
    for syn in wn.synsets(word):
        for lemma in syn.lemmas():
            synonym = lemma.name().replace('_', ' ').lower()
            if (
                synonym != word.lower()
                and synonym not in seen
                and len(synonym) > 2
            ):
                seen.add(synonym)
                synonyms.append(synonym)
            if len(synonyms) == 3:
                return synonyms
    return synonyms


def generate_synonym_data(title, keywords, section_headings):
    """
    Best fit sources for synonym_data extraction:

    keywords: main base
    title: most direct paraphrasable noun-verb pair
    section_headings: reinforce and diversify search terms
    """
    base_terms = set(keywords + [title] + section_headings)
    enriched = []
    for term in base_terms:
        # naive synonym example
        if "create" in term.lower():
            enriched.append("generate")
        if "token" in term.lower():
            enriched.append("access key")
        # ... add more rules
    return list(set(enriched))


def generate_uuid(path: str) -> uuid.UUID:
    with open(path, 'rb') as f:
        content = f.read()
    digest = hashlib.sha256(content).hexdigest()
    return uuid.uuid5(uuid.NAMESPACE_URL, digest)




In [None]:
import csv


#CSV Header
# file_name (url)
# file_ext (json, adoc, etc.)
# uuid
# repo_name
# last_modified
# file_purpose_use ("task, concept")
# summary (purpose) Using facebook/bart-large-cnn
# doc_type (concept, task, reference, overview, tutorial) Using facebook/bart-large-mnli
# chapter TBD
# keywords
# persona (business,developer,system admin, overweight)
# synonyms (title, keywords, section headings) using NLTK WordNet a lexical database for the English language
# section heading (subheadings of adoc files)
def extract_metedata_from_docs(docs):
    """
    Extract metadata from the documentation texts.
    """

    corpus = [split_frontmatter(content)[1] if ext=='adoc' else content for _, content, ext in docs]
    tag_lists = extract_tags_tfidf(corpus)

    metadata_list = []
    for entry in docs:
        path, content, fmt = entry
        metadata = {}
        metadata["file_path"] = path
        metadata["file_name"] = Path(path).name  
        metadata["repo_name"] = Path(path).parts[2]  # Assuming the second part is the repo name
        metadata["ext"] = fmt

        body = split_frontmatter(content) if fmt=='adoc' else ({}, content)

        # Extract title from content
        lines = content.splitlines()
        title = next((line for line in lines if line.startswith("= ")), "").replace("= ", "")
        metadata["title"] = title.strip()

        # Extract UUID
        uuid_line = next((line for line in lines if "uuid:" in line), "")
        metadata["uuid"] = uuid_line.split("uuid:")[-1].strip() if "uuid:" in uuid_line else None

        # Extract summary
        summary_line = next((line for line in lines if "summary:" in line), "")
        metadata["summary"] = summary_line.split("summary:")[-1].strip() if "summary:" in summary_line else None

        # Extract keywords
        keywords_line = next((line for line in lines if "keywords:" in line), "")
        metadata["keywords"] = [kw.strip() for kw in keywords_line.split("keywords:")[-1].split(",")] if "keywords:" in keywords_line else []   

        # Extract persona
        metadata["persona"] = classifier(summarize_purpose(content), PERSONA_LABELS)['labels'][0] #add frequency penalty , mini shot classification

        # Extract persona
        metadata["doc_type"] = classifier(summarize_purpose(content), DOC_TYPE_LABELS)['labels'][0]
        # Add last modified timestamp (convert datetime to string)
        last_modified = get_last_modified_datetime(path)
        metadata["last_modified"] = last_modified.isoformat() if last_modified else None
        metadata["section_headings"] = extract_section_headings(content)

        # Collect unique terms from title, keywords, and section headings
        conjunctions = {"and", "or", "so", "for", "user"}
        base_terms = set(word.lower() for word in metadata["title"].split() if len(word) > 2 and word.lower() not in conjunctions)
        base_terms.update([kw.strip().lower() for kw in metadata["keywords"] if len(kw.strip()) > 2 and kw.strip().lower() not in conjunctions])
        for heading in metadata["section_headings"]:
            words = re.findall(r"\b\w+\b", heading.lower())
            base_terms.update(word for word in words if len(word) > 2 and word not in conjunctions)
        
        # Create synonym data string
        synonym_data = ", ".join({synonym for term in base_terms if term.isalpha() for synonym in get_synonyms(term)})
        metadata["synonym_data"] = synonym_data
        metadata["tags"] = tag_lists[docs.index(entry)] if docs.index(entry) < len(tag_lists) else []
        metadata_list.append(metadata)
    
    return metadata_list

# Use the existing 'docs' variable from Cell 2

metadata_results = extract_metedata_from_docs(docs)

# Convert metadata_results to CSV format
docs_to_json(metadata_results, "cell_3_extracted_metadata.json")



Your max_length is set to 60, but your input_length is only 45. Since this is a summarization task, where outputs shorter than the input are typically wanted, you might consider decreasing max_length manually, e.g. summarizer('...', max_length=22)
Your max_length is set to 60, but your input_length is only 45. Since this is a summarization task, where outputs shorter than the input are typically wanted, you might consider decreasing max_length manually, e.g. summarizer('...', max_length=22)
Your max_length is set to 60, but your input_length is only 50. Since this is a summarization task, where outputs shorter than the input are typically wanted, you might consider decreasing max_length manually, e.g. summarizer('...', max_length=25)
Your max_length is set to 60, but your input_length is only 50. Since this is a summarization task, where outputs shorter than the input are typically wanted, you might consider decreasing max_length manually, e.g. summarizer('...', max_length=25)
Your max

In [None]:


# Update CSV headers to include missing fields
csv_headers = [
    "file_path", "file_name", "repo_name", "ext", "title", "uuid", "summary",
    "keywords", "last_modified", "section_headings", "synonym_data", "tags",
    "persona", "doc_type"
]


# Define the CSV file path
csv_file_path = "metadata_output.csv"

with open(csv_file_path, mode="w", encoding="utf-8", newline="") as csv_file:
    writer = csv.DictWriter(csv_file, fieldnames=csv_headers)
    writer.writeheader()
    for metadata in metadata_results:
        # Flatten lists for CSV compatibility
        metadata["keywords"] = ", ".join(metadata["keywords"])
        metadata["section_headings"] = ", ".join(metadata["section_headings"])
        metadata["tags"] = ", ".join(metadata["tags"])
        writer.writerow(metadata)

print(f"Metadata exported to '{csv_file_path}'")

✅ Extracted docs exported to 'cell_3_extracted_metadata.json.json'
Metadata exported to 'cell_3_extracted_metadata.csv'
