In [1]:
import os
import warnings
from sklearn.cluster import AgglomerativeClustering
from sentence_transformers import SentenceTransformer
import numpy as np
from collections import defaultdict
import requests


warnings.filterwarnings("ignore")


base_dir = "path/to/base"

readme_path = os.path.join(base_dir, "README.txt")
groundtruth_path = os.path.join(base_dir, "nonconforming-material-report-form.txt")
faulty_path = os.path.join(base_dir, "faulty-nonconforming-material-report-form.txt")
fault_description_path = os.path.join(base_dir, "fault-description.xml")

report_file_path = os.path.join(base_dir, "specifications_gap_report.txt")
improvements_file_path = os.path.join(base_dir, "specifications_improvements.txt")

embedding_model = SentenceTransformer("paraphrase-MiniLM-L6-v2")


def load_lines_from_file(file_path):
    """Load lines from a text file, stripping whitespace and ignoring empty lines."""
    if not os.path.exists(file_path):
        return []
    with open(file_path, "r", encoding="utf-8") as f:
        lines = [line.strip() for line in f if line.strip()]
    return lines

def generate_entity_embeddings(entities):
    return [{"text": entity, "embedding": embedding_model.encode(entity)} for entity in entities if entity.strip()]

def detect_faults(reference_entities, faulty_entities):
    embeddings = (
        [{"source": "reference", **e} for e in reference_entities] +
        [{"source": "faulty", **e} for e in faulty_entities]
    )

    if not embeddings:
        return []

    embedding_vectors = np.array([e["embedding"] for e in embeddings])
    clustering = AgglomerativeClustering(n_clusters=None, distance_threshold=1.0).fit(embedding_vectors)

    for idx, cluster_id in enumerate(clustering.labels_):
        embeddings[idx]["cluster"] = cluster_id

    clusters = defaultdict(list)
    for embedding in embeddings:
        clusters[embedding["cluster"]].append(embedding)

    faults = []
    for cluster_id, members in clusters.items():
        reference_members = [m for m in members if m["source"] == "reference"]
        faulty_members = [m for m in members if m["source"] == "faulty"]
        if not faulty_members and reference_members:
            # Missing specs from reference
            faults.append({"type": "missing", "cluster": cluster_id, "entities": reference_members})
        elif not reference_members and faulty_members:
            # Redundant specs in faulty file
            faults.append({"type": "redundant", "cluster": cluster_id, "entities": faulty_members})

    return faults

def suggest_improvements_batch(faults, readme_text, fault_description_xml,
                               groundtruth_name="nonconforming-material-report-form",
                               faulty_name="faulty-nonconforming-material-report-form"):
    """
    Suggest improvements for all faults in one go using Ollama.
    Incorporate README and fault-description.xml content into the prompt.
    """

    if not faults:
        return []

    missing_topics = [", ".join([entity["text"] for entity in f["entities"]]) 
                      for f in faults if f["type"] == "missing"]
    redundant_topics = [", ".join([entity["text"] for entity in f["entities"]]) 
                        for f in faults if f["type"] == "redundant"]

    prompt_parts = []
    prompt_parts.append("Below are instructions and context for improving specifications.\n\n")
    prompt_parts.append("README:\n")
    prompt_parts.append(readme_text + "\n\n")
    prompt_parts.append("Fault Descriptions (from XML):\n")
    prompt_parts.append(fault_description_xml + "\n\n")

    prompt_parts.append(f"Groundtruth Specifications Name: {groundtruth_name}\n")
    prompt_parts.append(f"Faulty Specifications Name: {faulty_name}\n\n")

    if missing_topics:
        prompt_parts.append("The following important reference (groundtruth) specs are missing in the faulty file:\n")
        for mt in missing_topics:
            prompt_parts.append(f"- {mt}\n")

    if redundant_topics:
        prompt_parts.append("\nThe following specs might be redundant or not aligned with the groundtruth:\n")
        for rt in redundant_topics:
            prompt_parts.append(f"- {rt}\n")

    prompt_parts.append("\nSuggest improvements to align the faulty specifications with best practices in specification design, as per the groundtruth and the fault descriptions above. "
                        "Please detail how to address missing and redundant specifications.\n")

    prompt = "".join(prompt_parts)

    # Ollama request
    url = "http://localhost:11434/api/generate"
    payload = {
        "model": "mistral",
        "prompt": prompt,
        "stream": False
    }
    response = requests.post(url, json=payload)
    if response.status_code == 200:
        result = response.json()
        suggestion_text = result.get("response", "").strip()
    else:
        suggestion_text = "No suggestion generated due to an error."

    improvement_suggestions = [{
        "fault_type": "mixed",
        "topics": "Multiple listed above",
        "suggestion": suggestion_text
    }]

    return improvement_suggestions


def main():
    # Load the various input files
    readme_text = ""
    if os.path.exists(readme_path):
        with open(readme_path, "r", encoding="utf-8") as f:
            readme_text = f.read().strip()

    fault_description_xml = ""
    if os.path.exists(fault_description_path):
        with open(fault_description_path, "r", encoding="utf-8") as f:
            fault_description_xml = f.read().strip()

    groundtruth_entities = load_lines_from_file(groundtruth_path)
    faulty_entities = load_lines_from_file(faulty_path)

    reference_embeddings = generate_entity_embeddings(groundtruth_entities)
    faulty_embeddings = generate_entity_embeddings(faulty_entities)

    # Detect faults
    faults = detect_faults(reference_embeddings, faulty_embeddings)

    gap_report_lines = []
    improvement_lines = []

    print("Processing specifications...")
    if faults:
        gap_report_lines.append("--- Fault Analysis ---")
        for f in faults:
            fault_topics = [e['text'] for e in f['entities']]
            gap_report_lines.append(f"Fault type: {f['type']}, Topics: {fault_topics}")
        gap_report_lines.append("")

        # Generate improvement suggestions
        suggestions = suggest_improvements_batch(faults, readme_text, fault_description_xml)
        improvement_lines.append("--- Suggestions for Improvement ---")
        for s in suggestions:
            improvement_lines.append(f"Fault Type: {s['fault_type']}")
            improvement_lines.append(f"Topics: {s['topics']}")
            improvement_lines.append("Suggestion:\n" + s['suggestion'] + "\n")
    else:
        gap_report_lines.append("No faults detected.\n")
        improvement_lines.append("No improvements needed.\n")

    with open(report_file_path, "w", encoding="utf-8") as report_file:
        report_file.write("\n".join(gap_report_lines))

    with open(improvements_file_path, "w", encoding="utf-8") as imp_file:
        imp_file.write("\n".join(improvement_lines))

    print("Processing complete. Reports generated.")
    print(f"Gap report: {report_file_path}")
    print(f"Improvement suggestions: {improvements_file_path}")

if __name__ == "__main__":
    main()


  from tqdm.autonotebook import tqdm, trange
2024-12-16 17:29:11.595277: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


Processing specifications...
Processing complete. Reports generated.
Gap report: /Users/sunnybhatt/Desktop/BPLLLMDEMO/specifications_gap_report.txt
Improvement suggestions: /Users/sunnybhatt/Desktop/BPLLLMDEMO/specifications_improvements.txt
