From now on, please replace your ChatGPT usage with your GPT API key (GPT-5 model). In this way, you don‚Äôt have to copy and paste things back-and-forth to prompt GPT. Instead, you can import input from a CSV or export output to a CSV file.

https://dl.acm.org/doi/pdf/10.1145/3715772

Please also ensure the temperature setting of the GPT model you use to be 0, to minimize randomness.For each jetbrain rule, please provide the syntax/examples/built-in functions. Ask GPT to output (1) the corresponding RSL rule based on the chosen library/framework, (2) explanation of the newly introduced functions if there is a new function, (3) URL of the third-party data source supporting that rule, (4) summary of the description from the third-party web page supporting that rule, and (5) its (model's) validation of the generated rule based on the jetbrain rule as well as the located third-party data source.
The prompt mimics Chain-of-Thought as it asks GPT to provide supporting facts, and validate the generated rule based on the given fact as well as retrieved support fact.

Please explore to memorize the syntax/examples/built-in functions by calling certain GPT function(s). If this is not possible, please include the syntax and built-in functions in each prompt.

In [1]:
# Global constants - - further automation by feeding a csv file input containing the info blow
FRAMEWORK = "Spring MVC"
TOPIC = "Mismatch in @PathVariable declarations and usages"

SOURCE = "https://www.jetbrains.com.cn/en-us/help/inspectopedia/MVCPathVariableInspection.html"



OUT_DIR = "./runs"
ARTIFACTS_DIR = "./artifacts/Mismatch_in_PathVariable_declarations_and_usages"


In [2]:
# Parameters
FRAMEWORK = "Contexts and Dependency Injection (CDI)"
SOURCE = "https://www.jetbrains.com.cn/en-us/help/inspectopedia/CdiDomBeans.html"
TOPIC = "Incorrect bean definitions in beans.xml\ufeff"
OUT_DIR = "C:\\Users\\spenc\\Downloads\\local\\runs"
ARTIFACTS_DIR = "C:\\Users\\spenc\\Downloads\\local\\artifacts\\Incorrect_bean_definitions_in_beans.xml"


In [3]:
# from google.colab import userdata
# import time, tempfile, os, re, json, requests
# from typing import List, Set, Dict, Optional
# from bs4 import BeautifulSoup
# from selenium import webdriver
# from selenium.webdriver.chrome.options import Options
# import pandas as pd
# from openai import OpenAI


import os, re, json, time, tempfile, requests
from typing import List, Set, Dict, Optional

import pandas as pd
from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.chrome.options import Options

from openai import OpenAI

from dotenv import load_dotenv
load_dotenv()


try:
    from dotenv import load_dotenv
    load_dotenv()
except Exception:

    pass


NUM_RULES = 1
CODEX5_SUGGESTING_NUM_RULES = 0




SERPAPI_API_KEY = os.getenv("SERPAPI_API_KEY", "")
# OPENAI_API_KEY  = os.getenv("OPENAI_API_KEY", "")
OPENAI_API_KEY = None



if not OPENAI_API_KEY:
    print("no openaikey")
if not SERPAPI_API_KEY:
    print("no serpapikey")


client = OpenAI()


def serpapi_google(q: str, num: int = 10) -> List[Dict]:
    if not SERPAPI_API_KEY:
        return []
    try:
        params = {"engine": "google", "q": q, "api_key": SERPAPI_API_KEY, "num": str(num)}
        r = requests.get("https://serpapi.com/search", params=params, timeout=20)
        r.raise_for_status()
        return r.json().get("organic_results", [])
    except Exception:
        return []

def locate_inspectopedia_url(framework: str, topic: str) -> Optional[str]:
    """
    Use SerpAPI to find the Inspectopedia page for the given framework/topic.
    Returns the first JetBrains.cn inspectopedia URL it finds.
    """
    queries = [
        f'site:jetbrains.com.cn Inspectopedia {FRAMEWORK} {TOPIC}',
        f'site:jetbrains.com.cn help Inspectopedia {FRAMEWORK} {TOPIC}',
    ]
    print(queries)
    for q in queries:
        for item in serpapi_google(q, num=10):
            url = item.get("link", "")
            if "inspectopedia" in url.lower() and "jetbrains.com.cn" in url.lower():
                print(url)
                return url
    return None

def isSOURCE(url: str) -> bool:
    if locate_inspectopedia_url(FRAMEWORK, TOPIC) == SOURCE:
        return url
    else:
        return SOURCE

def get_jetbrains_description(url: str) -> str:
    opts = Options()
    opts.add_argument("--headless")
    opts.add_argument("--disable-gpu")
    opts.add_argument("--no-sandbox")
    opts.add_argument("--disable-dev-shm-usage")
    opts.add_argument(f"--user-data-dir={tempfile.mkdtemp()}")

    driver = webdriver.Chrome(options=opts)
    driver.get(url)
    time.sleep(3)

    soup = BeautifulSoup(driver.page_source, "html.parser")
    driver.quit()

    span = soup.find("span", class_="sub-title sub-title--related-h1")
    if not span:
        return "(no span marker found)"

    text_parts = []
    for sib in span.find_all_next():
        if sib.name in ("h2", "section"):
            break
        if sib.name == "p":
            text_parts.append(sib.get_text(" ", strip=True))
    return " ".join(text_parts)



no openaikey


In [4]:
# Get the JetBrains inspection description
CONTENT = get_jetbrains_description(SOURCE)

In [5]:
print(CONTENT)

Reports incorrect bean definitions in beans.xml configuration files.


In [6]:
# Newly generated rules expressed in RSL would use only these built-in functions for now to minizme the work of modification of the current engine, MeCheck.

builtins_data = [
    # Code-related
    {"name":"callExists","category":"code","purpose":"Check whether a specific call exists","signature":""},
    {"name":"classExists","category":"code","purpose":"Check whether a class exists","signature":""},
    {"name":"getArg","category":"code","purpose":"Get an argument from a call/method","signature":""},
    {"name":"getClasses","category":"code","purpose":"Get a collection of classes","signature":""},
    {"name":"getConstructors","category":"code","purpose":"Get constructors of a class","signature":""},
    {"name":"getFamily","category":"code","purpose":"Get type hierarchy/family information","signature":""},
    {"name":"getFields","category":"code","purpose":"Get fields of a class","signature":""},
    {"name":"getFQN","category":"code","purpose":"Get fully qualified name","signature":""},
    {"name":"getMethods","category":"code","purpose":"Get methods of a class","signature":""},
    {"name":"getName","category":"code","purpose":"Get the simple name/identifier","signature":""},
    {"name":"getReturnType","category":"code","purpose":"Get a method's return type","signature":""},
    {"name":"getSN","category":"code","purpose":"Get simple name (SN)","signature":""},
    {"name":"getType","category":"code","purpose":"Get type information","signature":""},
    {"name":"hasField","category":"code","purpose":"Check whether a class has a specific field","signature":""},
    {"name":"hasParam","category":"code","purpose":"Check whether a method has a specific parameter","signature":""},
    {"name":"hasParamType","category":"code","purpose":"Check whether a method has a parameter of a given type","signature":""},
    {"name":"indexInBound","category":"code","purpose":"Check whether an index is within bounds","signature":""},
    {"name":"isIterable","category":"code","purpose":"Check whether a type is iterable","signature":""},
    {"name":"isLibraryClass","category":"code","purpose":"Check whether the class comes from libraries","signature":""},
    {"name":"isUniqueSN","category":"code","purpose":"Check whether the simple name is unique","signature":""},
    {"name":"locateClassSN","category":"code","purpose":"Locate a class by simple name","signature":""},
    {"name":"locateClassFQN","category":"code","purpose":"Locate a class by fully qualified name","signature":""},

    # Annotation-related
    {"name":"getAnnoAttr","category":"annotation","purpose":"Get an annotation attribute value","signature":""},
    {"name":"getAnnoAttrNames","category":"annotation","purpose":"Get the set of annotation attribute names","signature":""},
    {"name":"getAnnotated","category":"annotation","purpose":"Get elements annotated with a given annotation","signature":""},
    {"name":"hasAnnotation","category":"annotation","purpose":"Check whether an element has a given annotation","signature":""},
    {"name":"hasAnnoAttr","category":"annotation","purpose":"Check whether an annotation attribute exists","signature":""},

    # XML-related
    {"name":"elementExists","category":"xml","purpose":"Check whether an XML element exists","signature":""},
    {"name":"getAttr","category":"xml","purpose":"Get an XML attribute value","signature":""},
    {"name":"getAttrs","category":"xml","purpose":"Get a set of XML attributes","signature":""},
    {"name":"getElms","category":"xml","purpose":"Get a set of XML elements","signature":""},
    {"name":"getXMLs","category":"xml","purpose":"Get XML documents/fragments","signature":""},
    {"name":"hasAttr","category":"xml","purpose":"Check whether an XML attribute exists","signature":""},

    # Miscellaneous
    {"name":"endsWith","category":"misc","purpose":"Check whether a string ends with a suffix","signature":""},
    {"name":"isEmpty","category":"misc","purpose":"Check whether a string/collection is empty","signature":""},
    {"name":"indexOf","category":"misc","purpose":"Return the index of a substring/element","signature":""},
    {"name":"join","category":"misc","purpose":"Join/concatenate values","signature":""},
    {"name":"pathExists","category":"misc","purpose":"Check whether a filesystem/project path exists","signature":""},
    {"name":"substring","category":"misc","purpose":"Extract a substring","signature":""},
    {"name":"startsWith","category":"misc","purpose":"Check whether a string starts with a prefix","signature":""},
    {"name":"upperCase","category":"misc","purpose":"Convert a string to upper case","signature":""},
]

builtins_df = pd.DataFrame(builtins_data)
builtins_df


Unnamed: 0,name,category,purpose,signature
0,callExists,code,Check whether a specific call exists,
1,classExists,code,Check whether a class exists,
2,getArg,code,Get an argument from a call/method,
3,getClasses,code,Get a collection of classes,
4,getConstructors,code,Get constructors of a class,
5,getFamily,code,Get type hierarchy/family information,
6,getFields,code,Get fields of a class,
7,getFQN,code,Get fully qualified name,
8,getMethods,code,Get methods of a class,
9,getName,code,Get the simple name/identifier,


In [7]:
# === Text Registry (single source of truth) ===
RSL_SYNTAX = r"""Specification := Rule Id Body
Body := '{' Stmt Stmt* '}'
Stmt := ForStmt | IfStmt | AssertStmt | DeclStmt ';'

ForStmt := 'for' '(' Type Id 'in' Exp ')' Body
IfStmt := 'if' '(' Exp ')' Body

AssertStmt := 'assert' '(' Exp ')' '{' MsgStmt ';' '}'
MsgStmt := 'msg' '(' ',' SimExp (',' SimExp)* ')'

DeclStmt := Type Id '=' Exp

Exp := SimExp
     | SimExp AND Exp
     | SimExp OR  Exp
     | NOT Exp

SimExp := Id
        | Lit
        | FunctionCall
        | '(' Exp ')'
        | FunctionCall '==' SimExp
        | exists '(' Type Id in Exp ')' '(' Exp ')'

Type := '‚ü®' Id '‚ü©' | file | class | method | field | String
Lit := StringLit | CharLit | IntLit | FloatLit
FunctionCall := Id '(' Params ')'
Params := SimExp (',' SimExp)*"""

# Sample Rules are Rule #1, #3, and #5. If these change into some other subset, the output of newly generated rule may differ.

RSL_EXAMPLE_RULES = r"""// Rule 1 ‚Äî bean-class-exists
Rule bean-class-exists {
  for (file xml in getXMLs()) {
    if (elementExists(xml, "<bean>")) {
      for (<bean> bean in getElms(xml, "<bean>")) {
        String beanClassFQN = getAttr(bean, "class");
        if (NOT isEmpty(beanClassFQN)) {
          assert ( classExists(beanClassFQN) OR isLibraryClass(beanClassFQN) ) {
            msg("Bean class: %s mentioned in bean: %s, does not exist",
                beanClassFQN, getName(bean));
          }
        }
      }
    }
  }
}

// Rule 3 ‚Äî constructor-arg-name-field-map
Rule constructor-arg-name-field-map {
  for (file xml in getXMLs()) {
    if (elementExists(xml, "<bean>")) {
      for (<bean> bean in getElms(xml, "<bean>")) {
        String beanClassFQN = getAttr(bean, "class");
        if (classExists(beanClassFQN)) {
          class c = locateClassFQN(beanClassFQN);
          for (<constructor-arg> constructor_arg in getElms(bean, "<constructor-arg>")) {
            String arg_name = getAttr(constructor_arg, "name");
            if (NOT isEmpty(arg_name)) {
              assert ( exists(method con in getConstructors(c)) ( hasParam(con, arg_name) ) ) {
                msg("The name of <constructor-arg>: %s in bean: %s does not correspond to any constructor parameter in class: %s",
                    arg_name, getName(bean), getFQN(c));
              }
            }
          }
        }
      }
    }
  }
}

// Rule 5 ‚Äî constructor-index-out-of-bound
Rule constructor-index-out-of-bound {
  for (file xml in getXMLs()) {
    if (elementExists(xml, "<bean>")) {
      for (<bean> bean in getElms(xml, "<bean>")) {
        String beanClassFQN = getAttr(bean, "class");
        if (classExists(beanClassFQN)) {
          class c = locateClassFQN(beanClassFQN);
          for (<constructor-arg> constructor_arg in getElms(bean, "<constructor-arg>")) {
            if (hasAttr(constructor_arg, "index")) {
              String arg_idx = getAttr(constructor_arg, "index");
              assert ( exists(method constructor in getConstructors(c)) ( indexInBound(constructor, arg_idx) ) ) {
                msg("Constructor index: %s of bean for class: %s in xml: %s is out of bound",
                    arg_idx, getFQN(c), getName(xml));
              }
            }
          }
        }
      }
    }
  }
}"""


In [8]:
keyword_message = {"role": "user",
                   "content": f'''
                    Context:
                    Notice the inspection from JetBrains's Inspectopedia relating to
                    the {FRAMEWORK} framework or library -
                    {TOPIC}.
                    Additionally, here is the source for the topic:
                    {SOURCE}

                    Goal:
                    Grab 3 keywords from this topic that would be helpful for
                    searching third party sources relating to this specific inspection.
                    Have the keywords related soley on the framework and topic mentioned
                    rather than the JetBrains or Inspectopedia.
                    Return ONLY the keywords found with no explanation or additional
                    content and return each keyword as a single word or at least without
                    whitespace. Additionally, give each keyword a priority score in terms
                    of which keyword would give the best search results. Each score will
                    be a value between 1-15. Return the keywords and their respective
                    scores in the following format:
                    "keyword_a keyword_b keyword_c\nscore_a score_b score_c"
                    '''}

def get_topic_keywords() -> tuple[list[str], list[int]]:
    api_key = os.getenv("OPENAI_API_KEY", "")
    if not api_key:
        print("‚ö†Ô∏è OPENAI_API_KEY is missing in environment (.env).")
        return [], []

    client = OpenAI(api_key=api_key)
    response = client.chat.completions.create(
        model="gpt-4",
        temperature=0,
        messages=[keyword_message]
    )

    response_text = (response.choices[0].message.content or "").strip()


    lines = response_text.split("\n")
    if len(lines) == 2:
        keywords = lines[0].split()
        try:
            scores = [int(x) for x in lines[1].split()]
        except ValueError:
            print("‚ö†Ô∏è Score line is not integers.")
            return keywords, []
        return keywords, scores

    print("‚ö†Ô∏è Unexpected response format from OpenAI API.")
    return [], []

KEYWORDS, SCORES = get_topic_keywords()
print("Keywords:", KEYWORDS)
print("Scores:", SCORES)

Keywords: ['"CDI"', '"beans.xml"', '"IncorrectBeanDefinitions"']
Scores: [15, 12, 10]


In [9]:
PROMPT_ORIGINAL_TMPL = r"""You are an expert in metadata used in the {{FRAMEWORK}} library/framework.
Here is the core syntax of the language you will translate in plain English natural language per rule. This is the core syntax of Rule-Specific Language (RSL): {{RSL_SYNTAX}}

Here is the currently built-in functions: {{BUILTIN_REFERENCE}}
Here are three example rules expressed in RSL: {{EXAMPLE_RULES}}

As sample rules, I shared three rules with you.
Your job is to create one rule that corresponds to the topic {{TOPIC}} with the following content: {{CONTENT}}.

The JetBrains Inspectopedia source page for this rule is: {{SOURCE}}.
For the field "source URL (JetBrains web page)", output exactly {{SOURCE}}.

Like the sample rules, it should be described following the syntax of RSL. You briefly describe what the new rule checks. The new rule must use only the built-in functions. This means that you should not introduce new built-in functions while creating a new rule corresponding to the topic and content. In the "new_built-in_functions_explanation if any" field, provide names of all built-in functions used in the new rule after confirming that no new functions were introduced. Also, there should not be any comments or BOM (byte order mark) to save the rule in a txt file.

Then, locate the most relevant, at most, two 3rd-party web pages that address the the topic, content, and the generated rule in terms of metadata-related bugs. Provide a brief summary per the 3rd-party web page content in one field together. Finally, provide a post-model validation based on generated rule and the located most relevant 3rd-party web page(s).
Prioritize finding 3rd-party web pages with at least one of the following keywords in the URLs to ensure relevancy: [{{KEYWORDS}}]. The keywords found in the URLs do not have to be exact matches to the keywords in the list.

Return the output strictly as a valid JSON array, not text, not markdown, not explanation.
You must NOT include any text, commentary, or code fences (like ```).
You must NOT prepend or append any text before or after the JSON array.

Each JSON object must include exactly these fields:
[
  "framework",
  "source URL (JetBrains web page)",
  "brief_description of the content in the source URL",
  "GPT-4 generated_rule",
  "generated_rule_explanation from GPT-4",
  "new_built-in_functions_explanation if any",
  "PRE_model_validation",
  "3rd-party most relevant URLs and summary for each URL",
  "POST_model_validation"
]

For the "brief_description of the content in the source URL" field, provide a concise summary of the content provided in the prompt ({{CONTENT}}), which comes from the JetBrains page at {{SOURCE}}. Do NOT describe any other URL.

The output must start with "[" and end with "]".
If you cannot find information for any field, use an empty string ("")."""


In [10]:
def render_builtin_reference(df):
    return "\n".join(f"- {row['name']}" for _, row in df.iterrows())

In [11]:
def _mk_source_suffix(source: str | None) -> str:
    if not source or str(source).strip() == "":
        return ""
    return f" (Reference source: {source})"  # ‚úÖ For now source is hard-coded


def render_original_prompt(
    framework: str = FRAMEWORK,
    num_rules: int = NUM_RULES,
    topic: str = TOPIC,
    source: str | None = SOURCE,
    template_text: str = PROMPT_ORIGINAL_TMPL,
    content: str = CONTENT,
    keywords: str = ", ".join(KEYWORDS)   # ‚Üê Êîπ‰∏∫ÈÄóÂè∑ÂàÜÈöî
):
    """Splicing Final Prompt"""
    return (template_text
            .replace("{{RSL_SYNTAX}}", RSL_SYNTAX)
            .replace("{{EXAMPLE_RULES}}", RSL_EXAMPLE_RULES)
            .replace("{{BUILTIN_REFERENCE}}", render_builtin_reference(builtins_df))
            .replace("{{FRAMEWORK}}", framework)

            .replace("{{SOURCE_SUFFIX}}", _mk_source_suffix(source))
            .replace("{{SOURCE}}", (source or "").strip())  
            .replace("{{NUM_RULES}}", str(num_rules))
            .replace("{{TOPIC}}", topic)
            .replace("{{CONTENT}}", content)
            .replace("{{KEYWORDS}}", keywords))


# === [Block 2] Prompt Generation Combining All Info  ===
final_prompt = render_original_prompt()

print(final_prompt)


You are an expert in metadata used in the Contexts and Dependency Injection (CDI) library/framework.
Here is the core syntax of the language you will translate in plain English natural language per rule. This is the core syntax of Rule-Specific Language (RSL): Specification := Rule Id Body
Body := '{' Stmt Stmt* '}'
Stmt := ForStmt | IfStmt | AssertStmt | DeclStmt ';'

ForStmt := 'for' '(' Type Id 'in' Exp ')' Body
IfStmt := 'if' '(' Exp ')' Body

AssertStmt := 'assert' '(' Exp ')' '{' MsgStmt ';' '}'
MsgStmt := 'msg' '(' ',' SimExp (',' SimExp)* ')'

DeclStmt := Type Id '=' Exp

Exp := SimExp
     | SimExp AND Exp
     | SimExp OR  Exp
     | NOT Exp

SimExp := Id
        | Lit
        | FunctionCall
        | '(' Exp ')'
        | FunctionCall '==' SimExp
        | exists '(' Type Id in Exp ')' '(' Exp ')'

Type := '‚ü®' Id '‚ü©' | file | class | method | field | String
Lit := StringLit | CharLit | IntLit | FloatLit
FunctionCall := Id '(' Params ')'
Params := SimExp (',' SimExp)*

He

In [12]:
print(SOURCE)

https://www.jetbrains.com.cn/en-us/help/inspectopedia/CdiDomBeans.html


In [13]:
# --- Config (local paths) ---
import os, json
from pathlib import Path
ARTIFACT_DIR = Path(ARTIFACTS_DIR)          
ARTIFACT_DIR.mkdir(parents=True, exist_ok=True)

CSV_PATH = str(ARTIFACT_DIR / "original_rsl_generated_rules.csv")
newRULE_PATH = str(ARTIFACT_DIR / "new_GPT_4_generated_rule.txt")
newRULE_EXPLANATIONS_PATH = str(ARTIFACT_DIR / "new_generated_rule_explanation.json")
GPT4_RULE_DESCRIPTION_PATH = str(ARTIFACT_DIR / "gpt4_rule_description_scraped_content.json")



EXPECTED_COLUMNS = [
    "framework",
    "source URL (JetBrains web page)",
    "brief_description of the content in the source URL",
    "GPT-4 generated_rule",
    "generated_rule_explanation from GPT-4",
    "new_built-in_functions_explanation if any",
    "PRE_model_validation",
    "3rd-party most relevant URLs and summary for each URL",
    "POST_model validation",
]


# --- OpenAI client (no explicit temperature; model default applies) ---
api_key = os.getenv("OPENAI_API_KEY", "")
if not api_key:
    raise RuntimeError("OPENAI_API_KEY not found. Put it in your .env or system env.")

client = OpenAI(api_key=api_key)

response = client.chat.completions.create(
    model="gpt-4",
    temperature=0,
    messages=[{"role": "user", "content": final_prompt}]
)

raw = response.choices[0].message.content.strip()
print("Returned data preview:\n", raw[:500])
rules = json.loads(raw)

# --- Normalize to your schema (fill missing fields with empty strings) ---
def coerce_to_schema(item: dict) -> dict:
    # Try to map common alternative keys if your prompt returns slightly different names
    aliases = {
        "framework": ["framework"],
        "source URL (JetBrains web page)": ["source_url", "source URL", "jetbrains_source", "jetbrains_url"],
        "brief_description of the content in the source URL": ["brief_description", "brief description", "source_brief"],
        "GPT-4 generated_rule": ["generated_rule", "rule", "gpt4_rule"],
        "generated_rule_explanation from GPT-4": ["generated_rule_explanation", "rule_explanation", "gpt4_rule_explanation"],
        "new_built-in_functions_explanation if any": ["new_built_in_functions_explanation", "new_built-in_functions_explanation", "new_functions_note"],
        "PRE_model_validation": ["pre_model_validation", "pre_validation"],
        "3rd-party most relevant URLs and summary for each URL": ["third_party_urls_and_summaries", "3rd_party_urls_summaries"],
        "POST_model validation": ["post_model_validation", "post_validation"],
    }

    out = {}
    for col in EXPECTED_COLUMNS:
        val = ""
        if isinstance(item, dict):
            if col in item:
                val = item[col]
            else:
                # look for an alias key
                for alt in aliases.get(col, []):
                    if alt in item:
                        val = item[alt]
                        break
        out[col] = val if val is not None else ""
    return out

rows = [coerce_to_schema(x if isinstance(x, dict) else {}) for x in (rules if isinstance(rules, list) else [rules])]

df = pd.DataFrame(rows, columns=EXPECTED_COLUMNS)

# --- Save column ‚ÄúGPT-4 generated_rule‚Äù to TXT (one per line) ---
generated_rules_txt = [str(x).strip() for x in df["GPT-4 generated_rule"].fillna("")]
with open(newRULE_PATH, "w", encoding="utf-8") as f:
    for line in generated_rules_txt:
        if line:
            f.write(line + "\n")
print(f"‚úÖ Saved {len([x for x in generated_rules_txt if x])} rules to {newRULE_PATH}")

# --- Save column ‚Äúgenerated_rule_explanation from GPT-4‚Äù to JSON (list of strings) ---

generated_explanations = [str(x).strip() for x in df["generated_rule_explanation from GPT-4"].fillna("") if str(x).strip()]
single_explanation = generated_explanations[0] if generated_explanations else ""
try:
    os.makedirs(os.path.dirname(newRULE_EXPLANATIONS_PATH), exist_ok=True)
    with open(newRULE_EXPLANATIONS_PATH, "w", encoding="utf-8") as f:
        json.dump(
            {"new_generated_rule_explanation": single_explanation},
            f,
            ensure_ascii=False,
            indent=2
        )
    print(f"‚úÖ Saved new_generated_rule_explanation to {newRULE_EXPLANATIONS_PATH}")
except Exception as e:
    print(f"‚ùå Failed to save JSON: {e}")

with open(newRULE_EXPLANATIONS_PATH, "w", encoding="utf-8") as f:
    json.dump(generated_explanations, f, ensure_ascii=False, indent=2)

try:
    os.makedirs(os.path.dirname(newRULE_EXPLANATIONS_PATH), exist_ok=True)
    with open(newRULE_EXPLANATIONS_PATH, "w", encoding="utf-8") as f:
        json.dump(
            {"new_generated_rule_explanation": single_explanation},
            f,
            ensure_ascii=False,
            indent=2
        )
    print(f"‚úÖ Saved new_generated_rule_explanation to {newRULE_EXPLANATIONS_PATH}")
except Exception as e:
    print(f"‚ùå Failed to save JSON: {e}")



def load_new_rule_explanation(path: str = GPT4_RULE_DESCRIPTION_PATH) -> str:
    """
    Loads the previously saved GPT-5 rule description for contextual use in SerpAPI searches.
    Returns an empty string if none exists or file is corrupted.
    """
    if not os.path.exists(path):
        print("‚ö†Ô∏è No saved GPT-4 rule description found.")
        return ""
    try:
        with open(path, "r", encoding="utf-8") as f:
            data = json.load(f)
        return data.get("jetbrains_scraped_rule_description", "")
    except Exception as e:
        print(f"‚ö†Ô∏è Failed to load saved description: {e}")
        return ""

def load_gpt4_rule_description(path: str = newRULE_EXPLANATIONS_PATH) -> str:
    """
    Loads the new-GPT-4-generated rule explanation to provide context to SerpAPI later.
    Returns an empty string if none exists or file is corrupted.
    """
    if not os.path.exists(path):
        print("‚ö†Ô∏è No new-GPT-4 rule explanation found.")
        return ""
    try:
        with open(path, "r", encoding="utf-8") as f:
            data = json.load(f)
        return data.get("new_generated_rule_explanation", "")
    except Exception as e:
        print(f"‚ö†Ô∏è Failed to load saved description: {e}")
        return ""


# Step 2: Load later (e.g., in a new session)
loaded_text = load_new_rule_explanation(GPT4_RULE_DESCRIPTION_PATH)
print("\n--- Loaded back from JSON scraped from JetBrains  ---\n")
print(loaded_text[:500], "...")

# Step 3: Load later (e.g., in a new session)
loaded_text1 = load_gpt4_rule_description(newRULE_EXPLANATIONS_PATH)
print("\n--- Loaded back from JSON from gpt4_rule_explanation ---\n")
print(loaded_text1[:500], "...")

Returned data preview:
 [
  {
    "framework": "Contexts and Dependency Injection (CDI)",
    "source URL (JetBrains web page)": "https://www.jetbrains.com.cn/en-us/help/inspectopedia/CdiDomBeans.html",
    "brief_description of the content in the source URL": "The JetBrains Inspectopedia page provides information about the CDI Dom Beans inspection, which reports incorrect bean definitions in beans.xml configuration files.",
    "GPT-4 generated_rule": "Rule incorrect-bean-definitions {\n  for (file xml in getXMLs()) {
‚úÖ Saved 1 rules to C:\Users\spenc\Downloads\local\artifacts\Incorrect_bean_definitions_in_beans.xml\new_GPT_4_generated_rule.txt
‚úÖ Saved new_generated_rule_explanation to C:\Users\spenc\Downloads\local\artifacts\Incorrect_bean_definitions_in_beans.xml\new_generated_rule_explanation.json
‚úÖ Saved new_generated_rule_explanation to C:\Users\spenc\Downloads\local\artifacts\Incorrect_bean_definitions_in_beans.xml\new_generated_rule_explanation.json
‚ö†Ô∏è No saved GPT-4 

In [14]:
#--- Finally, save the full normalized CSV ---
df.to_csv(CSV_PATH, index=False)
print(f"‚úÖ Saved normalized data to {CSV_PATH}")

‚úÖ Saved normalized data to C:\Users\spenc\Downloads\local\artifacts\Incorrect_bean_definitions_in_beans.xml\original_rsl_generated_rules.csv


In [15]:
# Paths & column name that should be excluded for SerpAPI search
CSV_URLS_COLUMN = "3rd-party most relevant URLs and summary for each URL"

EXCLUDED_DOMAINS = {
    "jetbrains.com",
    "www.jetbrains.com",
    "jetbrains.com.cn",
    "www.jetbrains.com.cn",
    "example.com",
    "example.org"
}

from pathlib import Path
ARTIFACT_DIR = Path(ARTIFACTS_DIR)
ARTIFACT_DIR.mkdir(parents=True, exist_ok=True)

GPT_URLS_STORE = str(ARTIFACT_DIR / "gpt4_found_urls.json")


# URL extractor
_URL_RE = re.compile(r"https?://[^\s)>\]}\"']+", re.IGNORECASE)

def _filter_real_urls(urls: Set[str]) -> Set[str]:
    """Remove JetBrains and example/test domains."""
    cleaned = set()
    for url in urls:
        lowered = url.lower()
        if any(dom in lowered for dom in EXCLUDED_DOMAINS):
            continue
        cleaned.add(url.strip())
    return cleaned

def extract_urls(text: str) -> Set[str]:
    return set(_URL_RE.findall(text or ""))

def load_urls_from_csv(csv_path: str = CSV_PATH,
                       column: str = CSV_URLS_COLUMN) -> Set[str]:
    """
    Reads the CSV at csv_path and extracts all URLs from the target column.
    Returns a set of URLs (may be empty if file/column missing).
    """
    try:
        df = pd.read_csv(csv_path)
    except Exception as e:
        print(f"‚ö†Ô∏è Could not read CSV at {csv_path}: {e}")
        return set()

    if column not in df.columns:
        print(f"‚ö†Ô∏è Column not found: '{column}'. Available: {list(df.columns)}")
        return set()

    urls: Set[str] = set()
    for cell in df[column].dropna().astype(str):
        urls |= extract_urls(cell)
    return urls

def load_gpt5_found_urls(path: str = GPT_URLS_STORE,
                         csv_path: str = CSV_PATH,
                         column: str = CSV_URLS_COLUMN) -> Set[str]:
    """
    Loads URLs previously saved in JSON and merges with URLs extracted
    from the CSV column.
    """
    merged: Set[str] = set()

    # 1) URLs from CSV
    csv_urls = load_urls_from_csv(csv_path, column)
    if csv_urls:
        merged |= csv_urls

    # 2) URLs from JSON store (if present)
    if os.path.exists(path):
        try:
            with open(path, "r", encoding="utf-8") as f:
                data = json.load(f)
            if isinstance(data, list):
                merged |= set(map(str, data))
            else:
                print("‚ö†Ô∏è JSON store not a list; ignoring its contents.")
        except Exception as e:
            print(f"‚ö†Ô∏è Failed to load {path}: {e}")

    return merged

def save_gpt5_found_urls(
    urls: Set[str],
    path: str = GPT_URLS_STORE,
    also_merge_csv: bool = True,
    csv_path: str = CSV_PATH,
    column: str = CSV_URLS_COLUMN
) -> None:
    """
    Saves the union of:
      - provided `urls`
      - existing JSON store
      - (optionally) URLs parsed from the CSV column
    """
    # 1Ô∏è‚É£ Merge URLs
    existing = load_gpt5_found_urls(path, csv_path, column) if also_merge_csv else set()
    valid_urls = _filter_real_urls(urls)
    merged = sorted(existing | valid_urls)

    # 2Ô∏è‚É£ Save merged URLs to file
    try:
        with open(path, "w", encoding="utf-8") as f:
            json.dump(merged, f, ensure_ascii=False, indent=2)
        print(f"‚úÖ Saved {len(merged)} URLs to {path}")
    except Exception as e:
        print(f"‚ùå Failed to save {path}: {e}")
        return


    # before_count = len(EXCLUDED_DOMAINS)
    # EXCLUDED_DOMAINS.update(existing)
    # after_count = len(EXCLUDED_DOMAINS)
    # print(f"üîí Updated EXCLUDED_DOMAINS: added {after_count - before_count} new URL(s).")
    print(f"üß± EXCLUDED_DOMAINS (static) = {EXCLUDED_DOMAINS}")

# 1) Pull URLs from CSV + existing JSON
seed_urls = load_gpt5_found_urls()

# 2) Add any new URLs (e.g., from a fresh GPT-4 output or another pass)
new_urls = {"https://example.com/a", "https://example.org/b"}
save_gpt5_found_urls(new_urls)   # merges CSV + existing JSON + new ones into the store

# 3) Later: build your exclusion set from this store for SerpAPI searches
gpt5_found_urls = load_gpt5_found_urls()


‚úÖ Saved 2 URLs to C:\Users\spenc\Downloads\local\artifacts\Incorrect_bean_definitions_in_beans.xml\gpt4_found_urls.json
üß± EXCLUDED_DOMAINS (static) = {'example.org', 'jetbrains.com.cn', 'www.jetbrains.com.cn', 'jetbrains.com', 'www.jetbrains.com', 'example.com'}


In [16]:
from typing import Iterable
from urllib.parse import urlparse

DEFAULT_EXCLUDES = {
    "jetbrains.com",
    "www.jetbrains.com",
    "jetbrains.com.cn",
    "www.jetbrains.com.cn",
}

# More automation number 5
HOST_PRIORITY: Dict[str, int] = {
    "docs.jboss.org": 100,             # Hibernate javadocs
    "spring.io": 95,               # Hibernate website
    "jakarta.ee": 80,                  # EE docs
    "jetbrains.com": 70,               # Inspectopedia/IDEA docs
    "github.com": 40,                  # Code often useful
}

def _fetch_text(url: str, timeout: int = 20) -> str:
    resp = requests.get(url, timeout=timeout)
    resp.raise_for_status()
    soup = BeautifulSoup(resp.text, "html.parser")
    for tag in soup(["script","style","noscript"]):
        tag.decompose()
    text = soup.get_text(separator="\n")
    text = re.sub(r"\n\s*\n+", "\n\n", text).strip()
    return text

def search_framework_find(framework: str, topic: str,
                          jetbrains_intro: str, gpt4_expl: str,
                          exclude_domains: Optional[Set[str]] = None,
                          per_query: int = 10) -> List[dict]:
    queries = build_candidate_queries(framework, topic, jetbrains_intro, gpt4_expl)

    all_results: List[dict] = []
    for q in queries:
        batch = serpapi_search(q, exclude_domains=exclude_domains, num=per_query)
        all_results.extend(batch)
        time.sleep(0.2)  # be polite; SerpAPI handles rate limiting but avoid bursts

    # Deduplicate by URL
    dedup: Dict[str, dict] = {}
    for r in all_results:
        dedup[r["url"]] = r
    ranked = rerank(list(dedup.values()))
    return ranked[:20]  # top-N

def serpapi_search(query: str, exclude_domains: Optional[Set[str]] = None, num: int = 10) -> List[dict]:
    """
    Returns a *list of result dicts* (we keep title/snippet for reranking).
    """
    if not SERPAPI_API_KEY:
        print("‚ö†Ô∏è No SerpAPI key provided.")
        return []

    excludes = set(DEFAULT_EXCLUDES)
    if exclude_domains:
        excludes |= set(exclude_domains)

    params = {"engine": "google", "q": query, "api_key": SERPAPI_API_KEY, "num": str(num)}
    try:
        r = requests.get("https://serpapi.com/search", params=params, timeout=20)
        r.raise_for_status()
        data = r.json()
        out = []
        for item in data.get("organic_results", []):
            url = item.get("link")
            if not url:
                continue
            if is_excluded(url, excludes):
                continue
            out.append({
                "url": url,
                "title": item.get("title", ""),
                "snippet": item.get("snippet", ""),
                "position": item.get("position"),
            })
        return out
    except Exception as e:
        print(f"‚ö†Ô∏è SerpAPI request failed: {e}")
        return []

def is_excluded(url: str, excluded_domains: Set[str]) -> bool:
    """Return True if the URL's host matches any excluded domain."""
    if not excluded_domains:
        return False
    host = _extract_domain(url)
    return any(host == dom or host.endswith(f".{dom}") for dom in excluded_domains)

def build_candidate_queries(framework: str, topic: str, jetbrains_intro: str, gpt5_expl: str) -> List[str]:
    """
    Generate several strong Google queries with operators. We‚Äôll try them all and then rerank results.
    """
    ctx = normalize_space(" ".join([framework or "", topic or "", jetbrains_intro or "", gpt5_expl or ""]))
    terms = key_terms_from_text(ctx)
    joined = " ".join(terms) if terms else ctx

    # Pay special attention to '@Find' literal
    base = '"@PathVariable" annotation attribute spring mvc'

    # More automation number 1.
    candidates = [
        # precision: official javadocs
        f'site:docs.jboss.org {base}',
        f'site:spring.io {base}',
        # title/url hints
        f'intitle:{base}',
        f'inurl:spring intitle:PathVariable',
        # allintext to bind concepts
        f'allintext:PathVariable spring {joined}',
        # combine with JetBrains if trying to verify Inspectopedia rule
        f'site:jetbrains.com Inspectopedia "PathVariable Spring MVC',
        # fallback broad
        f'{base} {joined}',
    ]

    # de-duplicate + keep short, valid strings
    return uniq_preserve_order([normalize_space(c) for c in candidates if c.strip()])

def _extract_domain(url: str) -> str:
    try:
        host = re.sub(r"^https?://", "", url, flags=re.IGNORECASE).split("/")[0]
        return host.lower()
    except Exception:
        return ""

def normalize_space(s: str) -> str:
    return re.sub(r"\s+", " ", s or "").strip()

def key_terms_from_text(text: str, k: int = 8) -> List[str]:
    """
    Cheap, dependency-free keyterm picker: keep tokens with @, camelCase, or java-ish/Dot terms; fallback to frequent words.
    Replace with RAKE/TextRank if you like.
    """
    if not text:
        return []
    toks = re.findall(r"[@\w\.]+", text)
    # keep distinctive tokens
    candidates = [t for t in toks if len(t) > 2]
    # light heuristic boosts
    def score(t: str) -> float:
        s = 0.0
        if t.startswith("@"): s += 3
        if "." in t: s += 1.5   # package/class names
        if re.search(r"[A-Z][a-z]+[A-Z]", t): s += 1.0  # camelCase

        # More automation number 2.
        if t.lower() in {"pathvariable","requestmapping","spring","springmvc","incorrect","mismatch"}:
            s += 1.2
        return s
    ranked = sorted(candidates, key=lambda t: (score(t), len(t)), reverse=True)
    out = uniq_preserve_order(ranked)[:k]
    return out

def uniq_preserve_order(seq: Iterable[str]) -> List[str]:
    seen = set()
    out = []
    for s in seq:
        if s not in seen:
            seen.add(s)
            out.append(s)
    return out

def rerank(results: List[dict]) -> List[dict]:
    return sorted(results, key=feature_score, reverse=True)

# More automation number 3
def feature_score(result: dict) -> float:
    """
    Score by: host priority, presence of '@Find' in title/snippet/url,
    path hints like '/annotations/processing/Find'.
    """
    url, title, snip = result.get("url",""), result.get("title",""), result.get("snippet","")
    host = host_of(url)
    score = HOST_PRIORITY.get(host, 0)

    text = " ".join([url, title, snip]).lower()
    boosts = [(word, score) for word, score in zip(KEYWORDS, SCORES)]
    for pat, w in boosts:
        if re.search(pat, text):
            score += w

    # slight boost for earlier rank
    pos = result.get("position")
    if isinstance(pos, int):
        score += max(0, 10 - pos)

    return float(score)

# -------------------------------
# Helpers
# -------------------------------
def host_of(url: str) -> str:
    try:
        return urlparse(url).netloc.lower()
    except Exception:
        return ""

def is_excluded(url: str, excluded_hosts: Set[str]) -> bool:
    h = host_of(url)
    return any(h == ex or h.endswith(f".{ex}") for ex in excluded_hosts)

def uniq_preserve_order(seq: Iterable[str]) -> List[str]:
    seen = set()
    out = []
    for s in seq:
        if s not in seen:
            seen.add(s)
            out.append(s)
    return out

# -------------------------------
# Query building
# -------------------------------
# def normalize_space(s: str) -> str:
#     return re.sub(r"\s+", " ", s or "").strip()

# def key_terms_from_text(text: str, k: int = 8) -> List[str]:
#     """
#     Cheap, dependency-free keyterm picker: keep tokens with @, camelCase, or java-ish/Dot terms; fallback to frequent words.
#     Replace with RAKE/TextRank if you like.
#     """
#     if not text:
#         return []
#     toks = re.findall(r"[@\w\.]+", text)
#     # keep distinctive tokens
#     candidates = [t for t in toks if len(t) > 2]
#     # light heuristic boosts
#     def score(t: str) -> float:
#         s = 0.0
#         if t.startswith("@"): s += 3
#         if "." in t: s += 1.5   # package/class names
#         if re.search(r"[A-Z][a-z]+[A-Z]", t): s += 1.0  # camelCase
#         if t.lower() in {"find","finder","annotation","hibernate","entity","typedquery","selectionquery"}:
#             s += 1.2
#         return s
#     ranked = sorted(candidates, key=lambda t: (score(t), len(t)), reverse=True)
#     out = uniq_preserve_order(ranked)[:k]
#     return out

# def build_candidate_queries(framework: str, topic: str, jetbrains_intro: str, gpt5_expl: str) -> List[str]:
#     """
#     Generate several strong Google queries with operators. We‚Äôll try them all and then rerank results.
#     """
#     ctx = normalize_space(" ".join([framework or "", topic or "", jetbrains_intro or "", gpt5_expl or ""]))
#     terms = key_terms_from_text(ctx)
#     joined = " ".join(terms) if terms else ctx

#     # Pay special attention to '@Find' literal
#     base = '"@Find" hibernate annotation'

#     candidates = [
#         # precision: official javadocs
#         f'site:docs.jboss.org {base}',
#         f'site:hibernate.org {base}',
#         # title/url hints
#         f'intitle:@Find hibernate annotation',
#         f'inurl:hibernate intitle:@Find annotation',
#         # allintext to bind concepts
#         f'allintext:@Find hibernate entity finder {joined}',
#         # combine with JetBrains if trying to verify Inspectopedia rule
#         f'site:jetbrains.com Inspectopedia "@Find" hibernate',
#         # fallback broad
#         f'{base} {joined}',
#     ]

#     # de-duplicate + keep short, valid strings
#     return uniq_preserve_order([normalize_space(c) for c in candidates if c.strip()])

# -------------------------------
# Lightweight reranking
# # -------------------------------
# def feature_score(result: dict) -> float:
#     """
#     Score by: host priority, presence of '@Find' in title/snippet/url,
#     path hints like '/annotations/processing/Find'.
#     """
#     url, title, snip = result.get("url",""), result.get("title",""), result.get("snippet","")
#     host = host_of(url)
#     score = HOST_PRIORITY.get(host, 0)

#     text = " ".join([url, title, snip]).lower()

#     # boosts = [
#     #     (r"@configmapping", 15),
#     #     (r"\bquarkus\b", 8),
#     #     (r"\bannotation(s)?\b", 5),
#     #     (r".io/smallrye-config/", 12),
#     #     (r"/smallrye/", 10),
#     #     (r"\bprefix\b", 4),
#     # ]
#     for pat, w in boosts:
#         if re.search(pat, text):
#             score += w

#     # slight boost for earlier rank
#     pos = result.get("position")
#     if isinstance(pos, int):
#         score += max(0, 10 - pos)

#     return float(score)

def rerank(results: List[dict]) -> List[dict]:
    return sorted(results, key=feature_score, reverse=True)

# -------------------------------
# Orchestrator
# -------------------------------
def search__find(framework: str, topic: str,
                          jetbrains_intro: str, gpt4_expl: str,
                          exclude_domains: Optional[Set[str]] = None,
                          per_query: int = 10) -> List[dict]:
    queries = build_candidate_queries(framework, topic, jetbrains_intro, gpt4_expl)

    all_results: List[dict] = []
    for q in queries:
        batch = serpapi_search(q, exclude_domains=exclude_domains, num=per_query)
        all_results.extend(batch)
        time.sleep(0.2)  # be polite; SerpAPI handles rate limiting but avoid bursts

    # Deduplicate by URL
    dedup: Dict[str, dict] = {}
    for r in all_results:
        dedup[r["url"]] = r
    ranked = rerank(list(dedup.values()))
    return ranked[:20]  # top-N


In [17]:
# --- Prereqs --------------------------------------------------------------
# - serpapi_search() helper OR inline SerpAPI call
# - SOURCE / FRAMEWORK / TOPIC and your intro loader
# --- THESE TWO APPROACHES WORKED --------------------------------------------------------------

from openai import OpenAI
import os, json, re
from urllib.parse import urlparse  


client = OpenAI(api_key=os.getenv("OPENAI_API_KEY", ""))

# A thin wrapper that also enforces your exclusions.
def serpapi_search_excluding(q: str, exclude: set, num: int = 10) -> list[str]:

    merged_excludes = set(exclude) | {
        "jetbrains.com", "www.jetbrains.com",
        "jetbrains.com.cn", "www.jetbrains.com.cn",
    }
    results = serpapi_search(q, exclude_domains=merged_excludes, num=num)

    urls = [r.get("url", "") for r in results if r.get("url")]
    return urls[:num]

# --- 1) Define the tool schema the model can call ---------------------------
tools = [
    {
        "type": "function",
        "function": {
            "name": "serpapi_search_tool",
            "description": "Run a Google search via SerpAPI and return third-party URLs (JetBrains excluded).",
            "parameters": {
                "type": "object",
                "properties": {
                    "q": {"type": "string", "description": "The exact Google query to run."},
                    "max": {"type": "integer", "description": "Max URLs to return (<=10).", "default": 5},
                    "exclude": {
                        "type": "array",
                        "items": {"type": "string"},
                        "description": "Full URLs or domains to exclude"
                    }
                },
                "required": ["q"]
            }
        }
    }
]

# --- 2) Prepare the conversation -------------------------------------------
def source_intro_before_locating(url: str) -> str:
    txt = _fetch_text(url)
    m = re.search(r"Locating\s+this\s+inspection", txt, flags=re.IGNORECASE)
    return txt[:m.start()].strip() if m else txt.strip()

def build_context(rule: dict | str) -> str:
    gpt4_desc = load_gpt4_rule_description()  # may be ""
    rule_text = (rule.get("rule") or rule.get("name") or rule.get("description")) if isinstance(rule, dict) else str(rule)
    intro = source_intro_before_locating(SOURCE)
    context = re.sub(r"\s+", " ", f"{FRAMEWORK} {intro} {gpt4_desc} {rule_text} {TOPIC}").strip()
    return context

system_msg = {
    "role": "system",
    "content": (
        "You are an analyst. Decide the BEST Google query for SerpAPI to find a NEW third-party page that "
        "specifically addresses the rule context. Prefer queries that are precise (use allintext/intitle if helpful). "
        "Do NOT return JetBrains links. When you need to search, CALL the serpapi_search_tool exactly once with your query."
    )
}

def ask_model_to_search(rule, excluded: set):
    user_msg = {
        "role": "user",
        "content": (
            "Context for the rule:\n"
            f"{build_context(rule)}\n\n"
            "Goal: Find ONE new relevant third-party URL that discusses this rule/topic. "
            "Return the single best URL and a one-sentence why-it‚Äôs-relevant."
        )
    }

    # --- 3) First turn: model decides the query and issues a tool call ------
    first = client.chat.completions.create(
        model="gpt-5",
        messages=[system_msg, user_msg],
        tools=tools,
        tool_choice="auto"
    )

    msg = first.choices[0].message
    tool_calls = getattr(msg, "tool_calls", None)

    if not tool_calls:
        return {"url": "", "rationale": msg.content or ""}

    tc = tool_calls[0]
    if tc.function.name != "serpapi_search_tool":
        return {"url": "", "rationale": "Model called unexpected tool."}

    # Parse tool args
    args = json.loads(tc.function.arguments or "{}")
    q = args.get("q", "")
    k = int(args.get("max", 5))
    ex = set(args.get("exclude") or set()) | excluded

    # Run the tool (SerpAPI) ourselves
    urls = serpapi_search_excluding(q, ex, num=min(k, 10))

    # --- 4) Send tool results back to the model -----------------------------
    tool_result_msg = {
        "role": "tool",
        "tool_call_id": tc.id,
        "name": "serpapi_search_tool",
        "content": json.dumps({"query": q, "results": urls}, ensure_ascii=False)
    }

    final = client.chat.completions.create(
        model="gpt-5",
        messages=[system_msg, user_msg, msg, tool_result_msg]
    )

    out = final.choices[0].message.content.strip() if final.choices else ""
    m = re.search(r"(https?://\S+)", out)
    best_url = m.group(1) if m else (urls[0] if urls else "")
    return {"url": best_url, "rationale": out}

# --- 5) Usage ---------------------------------------------------------------
# Build the live exclusion set (exact URLs + domains)
gpt5_found_urls = load_gpt5_found_urls()  # your persisted set
current_exclusions = set(EXCLUDED_DOMAINS) | set(gpt5_found_urls)


result = ask_model_to_search(rule={"rule": TOPIC}, excluded=current_exclusions)
print("Chosen URL:", result["url"])
print("Model rationale:\n", result["rationale"])

if result["url"]:
    save_gpt5_found_urls({result["url"]})

framework = FRAMEWORK
topic = TOPIC
jetbrains_intro = load_new_rule_explanation(GPT4_RULE_DESCRIPTION_PATH)
gpt5_expl = load_gpt4_rule_description(newRULE_EXPLANATIONS_PATH)

top = search_framework_find(framework, topic, jetbrains_intro, gpt5_expl)
for r in top[:5]:
    print(r["url"], "‚Äî", r["title"])


‚ö†Ô∏è SerpAPI request failed: 429 Client Error: Too Many Requests for url: https://serpapi.com/search?engine=google&q=CDI+beans.xml+alternatives+%3Cclass%3E+fully+qualified+name+exists&api_key=7d0961db7173e1210a6c719fff5e1a4a785c071ca300e2676cdd7f2b3e2b61b6&num=5


Chosen URL: https://docs.jboss.org/weld/reference/latest/en-US/html/configure.html#_beans_xml
Model rationale:
 https://docs.jboss.org/weld/reference/latest/en-US/html/configure.html#_beans_xml

Why it‚Äôs relevant: This Weld reference page explains the beans.xml structure and that class entries (e.g., for alternatives/interceptors/decorators) use fully qualified class names that must correspond to existing classes, matching what the incorrect-bean-definitions rule checks.
‚úÖ Saved 3 URLs to C:\Users\spenc\Downloads\local\artifacts\Incorrect_bean_definitions_in_beans.xml\gpt4_found_urls.json
üß± EXCLUDED_DOMAINS (static) = {'example.org', 'jetbrains.com.cn', 'www.jetbrains.com.cn', 'jetbrains.com', 'www.jetbrains.com', 'example.com'}
‚ö†Ô∏è No saved GPT-4 rule description found.


‚ö†Ô∏è SerpAPI request failed: 429 Client Error: Too Many Requests for url: https://serpapi.com/search?engine=google&q=site%3Adocs.jboss.org+%22%40PathVariable%22+annotation+attribute+spring+mvc&api_key=7d0961db7173e1210a6c719fff5e1a4a785c071ca300e2676cdd7f2b3e2b61b6&num=10


‚ö†Ô∏è SerpAPI request failed: 429 Client Error: Too Many Requests for url: https://serpapi.com/search?engine=google&q=site%3Aspring.io+%22%40PathVariable%22+annotation+attribute+spring+mvc&api_key=7d0961db7173e1210a6c719fff5e1a4a785c071ca300e2676cdd7f2b3e2b61b6&num=10


‚ö†Ô∏è SerpAPI request failed: 429 Client Error: Too Many Requests for url: https://serpapi.com/search?engine=google&q=intitle%3A%22%40PathVariable%22+annotation+attribute+spring+mvc&api_key=7d0961db7173e1210a6c719fff5e1a4a785c071ca300e2676cdd7f2b3e2b61b6&num=10


‚ö†Ô∏è SerpAPI request failed: 429 Client Error: Too Many Requests for url: https://serpapi.com/search?engine=google&q=inurl%3Aspring+intitle%3APathVariable&api_key=7d0961db7173e1210a6c719fff5e1a4a785c071ca300e2676cdd7f2b3e2b61b6&num=10


‚ö†Ô∏è SerpAPI request failed: 429 Client Error: Too Many Requests for url: https://serpapi.com/search?engine=google&q=allintext%3APathVariable+spring+beans.xml+elements.+class.+bean.+Incorrect+incorrect+definitions+Dependency&api_key=7d0961db7173e1210a6c719fff5e1a4a785c071ca300e2676cdd7f2b3e2b61b6&num=10


‚ö†Ô∏è SerpAPI request failed: 429 Client Error: Too Many Requests for url: https://serpapi.com/search?engine=google&q=site%3Ajetbrains.com+Inspectopedia+%22PathVariable+Spring+MVC&api_key=7d0961db7173e1210a6c719fff5e1a4a785c071ca300e2676cdd7f2b3e2b61b6&num=10


‚ö†Ô∏è SerpAPI request failed: 429 Client Error: Too Many Requests for url: https://serpapi.com/search?engine=google&q=%22%40PathVariable%22+annotation+attribute+spring+mvc+beans.xml+elements.+class.+bean.+Incorrect+incorrect+definitions+Dependency&api_key=7d0961db7173e1210a6c719fff5e1a4a785c071ca300e2676cdd7f2b3e2b61b6&num=10


In [18]:
# --- Rule Validation Utilities (Using SerpAPI) ---
# --- Naive approach of combining all contexts in text form
import html

def _extract_domain_u(url: str) -> str:
    try:
        host = re.sub(r"^https?://", "", url, flags=re.IGNORECASE).split("/")[0]
        return host.lower()
    except Exception:
        return ""

def is_excluded_u(url: str, excluded_domains: Set[str]) -> bool:
    """Return True if the URL's host matches any excluded domain."""
    if not excluded_domains:
        return False
    host = _extract_domain_u(url)
    return any(host == dom or host.endswith(f".{dom}") for dom in excluded_domains)

def serpapi_search_urls(query: str, exclude_domains: Optional[Set[str]] = None, num: int = 10) -> List[str]:
    """
    Searches Google via SerpAPI and returns URLs that are *not* excluded.
    """
    if not SERPAPI_API_KEY:
        print("‚ö†Ô∏è No SerpAPI key provided.")
        return []

    try:
        params = {"engine": "google", "q": query, "api_key": SERPAPI_API_KEY, "num": str(num)}
        r = requests.get("https://serpapi.com/search", params=params, timeout=20)
        r.raise_for_status()
        data = r.json()
        out: List[str] = []
        for item in data.get("organic_results", []):
            url = item.get("link")
            if url and not is_excluded_u(url, EXCLUDED_DOMAINS):
                out.append(url)
        return out
    except Exception:
        print("‚ö†Ô∏è SerpAPI request failed.")
        return []

# def build_query_from_rule(rule: dict | str) -> str:
#     """
#     Build a SerpAPI query using:
#       - The SOURCE intro (before 'Locating this inspection')
#       - The saved GPT-4 rule description (if available)
#       - FRAMEWORK and TOPIC context
#     Adds Google's 'allintext:' operator to focus on pages that contain all contextual words.
#     """
#     # 1Ô∏è‚É£ Load saved GPT-4 rule description (if any)
#     _ = load_gpt4_rule_description()

#     # 2Ô∏è‚É£ Get Inspectopedia intro (before 'Locating this inspection')
#     intro = source_intro_before_locating(SOURCE)

#     # 3Ô∏è‚É£ Get explanation of the new generated rule
#     newR = load_gpt4_rule_description(newRULE_EXPLANATIONS_PATH)

#     # 4Ô∏è‚É£ Combine minimal context
#     combined_context = f"{FRAMEWORK} {intro} {newR} {TOPIC}"
#     combined_context = re.sub(r"\s+", " ", combined_context).strip()
#     print("Combined context:", combined_context)

#     # 5Ô∏è‚É£ Return a valid Google query
#     return f'allintext:("{TOPIC}" {combined_context})'


def build_query_from_rule(rule: dict | str) -> str:

    terms = ['"@PathVariable"', '"Spring MVC"', "mismatch"]

    return f'allintext:({" ".join(terms)})'



def validate_rule_via_serpapi(rule_text: str,
                              excluded_domains: Optional[Set[str]] = None,
                              max_urls: int = 3) -> Dict[str, object]:
    """
    Uses SerpAPI with an allintext-based query for higher relevancy.
    Returns up to `max_urls` URLs excluding JetBrains and other specified domains.
    """
    q = build_query_from_rule(rule_text)
    print(f"üîç SerpAPI query:\n{q}\n")

    urls = serpapi_search_urls(q, excluded_domains or set())

    if not urls:
        print("‚ö†Ô∏è No URLs found ‚Äî query may be too specific or context too long. Try shortening the input.")
    else:
        print(f"‚úÖ Found {len(urls)} URLs:")
        for u in urls[:max_urls]:
            print(" ‚Ä¢", u)

    return {"query": q, "urls": urls[:max_urls]}


query = build_query_from_rule({"content": CONTENT})
r = validate_rule_via_serpapi(query)
urls_only = r["urls"]
print(urls_only)


üîç SerpAPI query:
allintext:("@PathVariable" "Spring MVC" mismatch)



‚ö†Ô∏è SerpAPI request failed.
‚ö†Ô∏è No URLs found ‚Äî query may be too specific or context too long. Try shortening the input.
[]


In [19]:


# Appending SerpAPI-found-URL into the CSV file (local)
from pathlib import Path
import os, json
import pandas as pd




NEW_URLS_COLUMN = "SerpAPI-found URLs"

def append_serpapi_urls_to_csv(serpapi_output: dict,
                               csv_path: str = CSV_PATH,
                               column: str = NEW_URLS_COLUMN):

    if not os.path.exists(csv_path):
        raise FileNotFoundError(f"‚ùå CSV not found at {csv_path}. Please verify the path.")

    df = pd.read_csv(csv_path)

    if column not in df.columns:
        df[column] = ""


    new_urls = list(serpapi_output.get("urls", []) or [])
    if not new_urls:
        print("‚ö†Ô∏è No URLs found in SerpAPI output ‚Äî nothing to append.")
        return df

    new_urls_str = "; ".join(new_urls)

    if len(df) > 0:
        last_idx = df.index[-1]
        existing = str(df.at[last_idx, column])
        if existing and isinstance(existing, str):
            combined = set(existing.split("; ")) | set(new_urls)
            df.at[last_idx, column] = "; ".join(sorted(combined))
        else:
            df.at[last_idx, column] = new_urls_str
    else:
        df = pd.DataFrame([{column: new_urls_str}])

    df.to_csv(csv_path, index=False)
    print(f"‚úÖ Appended {len(new_urls)} SerpAPI URL(s) to {csv_path}")
    return df


updated_df = append_serpapi_urls_to_csv(r)
print(updated_df.tail(1))


‚ö†Ô∏è No URLs found in SerpAPI output ‚Äî nothing to append.
                                 framework  \
0  Contexts and Dependency Injection (CDI)   

                     source URL (JetBrains web page)  \
0  https://www.jetbrains.com.cn/en-us/help/inspec...   

  brief_description of the content in the source URL  \
0  The JetBrains Inspectopedia page provides info...   

                                GPT-4 generated_rule  \
0  Rule incorrect-bean-definitions {\n  for (file...   

               generated_rule_explanation from GPT-4  \
0  This rule, named 'incorrect-bean-definitions',...   

   new_built-in_functions_explanation if any  \
0                                        NaN   

                                PRE_model_validation  \
0  The rule is syntactically correct and follows ...   

  3rd-party most relevant URLs and summary for each URL  \
0  1. https://www.baeldung.com/cdi-qualifiers: Th...      

   POST_model validation SerpAPI-found URLs  
0                 

In [20]:
# ### STILL NOT THE ANSWER WEB PAGE IS LOCATED!!! ###
# ## I thought I need (1) better query construction, (2) multi-query searching, and (3) a simple reranker.
# # Below is a drop-in that fixes bugs, adds strong Google operators, runs several candidates, and
# # ranks results by ‚ÄúHibernate/@Find‚Äù-likeness. I also add clean exclusions + host priorities.

# from __future__ import annotations
# import os, re, time, json, math
# from typing import List, Optional, Set, Dict, Iterable, Tuple
# from urllib.parse import urlparse
# import requests

# # -------------------------------
# # Config
# # -------------------------------
# SERPAPI_API_KEY = userdata.get("SERPAPI_API_KEY")
# DEFAULT_EXCLUDES: Set[str] = {
#     "facebook.com","x.com","twitter.com","pinterest.com","youtube.com",
#     "linkedin.com","tiktok.com","instagram.com","buffercdn.com"
# }

# # Prefer official docs that likely contain @Find
# HOST_PRIORITY: Dict[str, int] = {
#     "docs.jboss.org": 100,             # Hibernate javadocs
#     "hibernate.org": 95,               # Hibernate website
#     "jakarta.ee": 80,                  # EE docs
#     "jetbrains.com": 70,               # Inspectopedia/IDEA docs
#     "github.com": 40,                  # Code often useful
# }

# # -------------------------------
# # Helpers
# # -------------------------------
# def host_of(url: str) -> str:
#     try:
#         return urlparse(url).netloc.lower()
#     except Exception:
#         return ""

# def is_excluded(url: str, excluded_hosts: Set[str]) -> bool:
#     h = host_of(url)
#     return any(h == ex or h.endswith(f".{ex}") for ex in excluded_hosts)

# def uniq_preserve_order(seq: Iterable[str]) -> List[str]:
#     seen = set()
#     out = []
#     for s in seq:
#         if s not in seen:
#             seen.add(s)
#             out.append(s)
#     return out

# # -------------------------------
# # SerpAPI search (multi-query)
# # -------------------------------
# def serpapi_search(query: str, exclude_domains: Optional[Set[str]] = None, num: int = 10) -> List[dict]:
#     """
#     Returns a *list of result dicts* (we keep title/snippet for reranking).
#     """
#     if not SERPAPI_API_KEY:
#         print("‚ö†Ô∏è No SerpAPI key provided.")
#         return []

#     excludes = set(DEFAULT_EXCLUDES)
#     if exclude_domains:
#         excludes |= set(exclude_domains)

#     params = {"engine": "google", "q": query, "api_key": SERPAPI_API_KEY, "num": str(num)}
#     try:
#         r = requests.get("https://serpapi.com/search", params=params, timeout=20)
#         r.raise_for_status()
#         data = r.json()
#         out = []
#         for item in data.get("organic_results", []):
#             url = item.get("link")
#             if not url:
#                 continue
#             if is_excluded(url, excludes):
#                 continue
#             out.append({
#                 "url": url,
#                 "title": item.get("title", ""),
#                 "snippet": item.get("snippet", ""),
#                 "position": item.get("position"),
#             })
#         return out
#     except Exception as e:
#         print(f"‚ö†Ô∏è SerpAPI request failed: {e}")
#         return []

# # -------------------------------
# # Query building
# # -------------------------------
# def normalize_space(s: str) -> str:
#     return re.sub(r"\s+", " ", s or "").strip()

# def key_terms_from_text(text: str, k: int = 8) -> List[str]:
#     """
#     Cheap, dependency-free keyterm picker: keep tokens with @, camelCase, or java-ish/Dot terms; fallback to frequent words.
#     Replace with RAKE/TextRank if you like.
#     """
#     if not text:
#         return []
#     toks = re.findall(r"[@\w\.]+", text)
#     # keep distinctive tokens
#     candidates = [t for t in toks if len(t) > 2]
#     # light heuristic boosts
#     def score(t: str) -> float:
#         s = 0.0
#         if t.startswith("@"): s += 3
#         if "." in t: s += 1.5   # package/class names
#         if re.search(r"[A-Z][a-z]+[A-Z]", t): s += 1.0  # camelCase
#         if t.lower() in {"find","finder","annotation","hibernate","entity","typedquery","selectionquery"}:
#             s += 1.2
#         return s
#     ranked = sorted(candidates, key=lambda t: (score(t), len(t)), reverse=True)
#     out = uniq_preserve_order(ranked)[:k]
#     return out

# def build_candidate_queries(framework: str, topic: str, jetbrains_intro: str, gpt5_expl: str) -> List[str]:
#     """
#     Generate several strong Google queries with operators. We‚Äôll try them all and then rerank results.
#     """
#     ctx = normalize_space(" ".join([framework or "", topic or "", jetbrains_intro or "", gpt5_expl or ""]))
#     terms = key_terms_from_text(ctx)
#     joined = " ".join(terms) if terms else ctx

#     # Pay special attention to '@Find' literal
#     base = '"@Find" hibernate annotation'

#     candidates = [
#         # precision: official javadocs
#         f'site:docs.jboss.org {base}',
#         f'site:hibernate.org {base}',
#         # title/url hints
#         f'intitle:@Find hibernate annotation',
#         f'inurl:hibernate intitle:@Find annotation',
#         # allintext to bind concepts
#         f'allintext:@Find hibernate entity finder {joined}',
#         # combine with JetBrains if trying to verify Inspectopedia rule
#         f'site:jetbrains.com Inspectopedia "@Find" hibernate',
#         # fallback broad
#         f'{base} {joined}',
#     ]

#     # de-duplicate + keep short, valid strings
#     return uniq_preserve_order([normalize_space(c) for c in candidates if c.strip()])

# # -------------------------------
# # Lightweight reranking
# # -------------------------------
# def feature_score(result: dict) -> float:
#     """
#     Score by: host priority, presence of '@Find' in title/snippet/url,
#     path hints like '/annotations/processing/Find'.
#     """
#     url, title, snip = result.get("url",""), result.get("title",""), result.get("snippet","")
#     host = host_of(url)
#     score = HOST_PRIORITY.get(host, 0)

#     text = " ".join([url, title, snip]).lower()
#     boosts = [
#         (r"@find", 15),
#         (r"\bhibernate\b", 8),
#         (r"\bannotation(s)?\b", 5),
#         (r"/annotations/processing/find", 12),
#         (r"/javadocs/", 10),
#         (r"\bentity\b", 4),
#     ]
#     for pat, w in boosts:
#         if re.search(pat, text):
#             score += w

#     # slight boost for earlier rank
#     pos = result.get("position")
#     if isinstance(pos, int):
#         score += max(0, 10 - pos)

#     return float(score)

# def rerank(results: List[dict]) -> List[dict]:
#     return sorted(results, key=feature_score, reverse=True)

# # -------------------------------
# # Orchestrator
# # -------------------------------
# def search_framework_find(framework: str, topic: str,
#                           jetbrains_intro: str, gpt5_expl: str,
#                           exclude_domains: Optional[Set[str]] = None,
#                           per_query: int = 10) -> List[dict]:
#     queries = build_candidate_queries(framework, topic, jetbrains_intro, gpt5_expl)

#     all_results: List[dict] = []
#     for q in queries:
#         batch = serpapi_search(q, exclude_domains=exclude_domains, num=per_query)
#         all_results.extend(batch)
#         time.sleep(0.2)  # be polite; SerpAPI handles rate limiting but avoid bursts

#     # Deduplicate by URL
#     dedup: Dict[str, dict] = {}
#     for r in all_results:
#         dedup[r["url"]] = r
#     ranked = rerank(list(dedup.values()))
#     return ranked[:20]  # top-N


# framework = FRAMEWORK            # e.g., "Hibernate ORM 6.x"
# topic = TOPIC                    # e.g., "Finder methods with @Find"
# jetbrains_intro = load_new_rule_explanation(GPT5_RULE_DESCRIPTION_PATH)   # scraped from JetBrains web page

# gpt5_expl = load_gpt5_rule_description(newRULE_EXPLANATIONS_PATH)              # your own summary, optional

# top = search_framework_find(framework, topic, jetbrains_intro, gpt5_expl)
# for r in top[:5]:
#     print(r["url"], "‚Äî", r["title"])

Yes, the answer web page was "https://docs.jboss.org/hibernate/orm/7.0/javadocs/org/hibernate/annotations/processing/Find.html"

In [21]:
##### TO-DOs ######
# What if the one generated rule is too big?
# Save the generated rule in json file or text file first then consult with Codex-GPT-5
# To optimize the propmt, look into DsPY: https://adasci.org/dspy-streamlining-llm-prompt-optimization/
# Using DsPy, we‚Äôll configure our Language Model (GPT-3.5-turbo) and Retrieval Model (ColBERTv2). These will form the backbone of our RAG system.
# Codex-GPT-5 could help break down the one (big) generated rule if it exceeds N lines - - will this be meaninful in terms of validation and running the MeCheck engine to check more granular rules?
# Then, iterate the validation process with each sub-rule derived from the original rule both with GPT-5 and SerpAPI
# We will Hibernate's incorrect @find annotation rule will be implemented into the current MeCheck engine





In [22]:
####################################################OLD or MERGED FUNCTIONS ##############
#########################################################################################


# To save the gpt5-generated rule description based on the scraped content from the JetBrains webpage
# import os, re, json, requests
# from bs4 import BeautifulSoup
# GPT5_RULE_DESCRIPTION_PATH = "/content/gpt5_rule_description_scraped_content.json"


# # --- Save column ‚Äúgenerated_rule_explanation from GPT-5‚Äù to JSON (list of strings) ---

# generated_explanations = [str(x).strip() for x in df["generated_rule_explanation from GPT-5"].fillna("") if str(x).strip()]
# single_explanation = generated_explanations[0] if generated_explanations else ""
# with open(newRULE_EXPLANATIONS_PATH, "w", encoding="utf-8") as f:
#     json.dump(generated_explanations, f, ensure_ascii=False, indent=2)

# try:
#     os.makedirs(os.path.dirname(newRULE_EXPLANATIONS_PATH), exist_ok=True)
#     with open(newRULE_EXPLANATIONS_PATH, "w", encoding="utf-8") as f:
#         json.dump(
#             {"new_generated_rule_explanation": single_explanation},
#             f,
#             ensure_ascii=False,
#             indent=2
#         )
#     print(f"‚úÖ Saved new_generated_rule_explanation to {newRULE_EXPLANATIONS_PATH}")
# except Exception as e:
#     print(f"‚ùå Failed to save JSON: {e}")


# def fetch_text(url: str, timeout: int = 20) -> str:
#     """
#     Fetches text content from the SOURCE URL.
#     Cleans out scripts, styles, and noscript tags.
#     """
#     url = SOURCE  # Always use the global SOURCE variable
#     resp = requests.get(url, timeout=timeout)
#     resp.raise_for_status()
#     soup = BeautifulSoup(resp.text, "html.parser")
#     for tag in soup(["script", "style", "noscript"]):
#         tag.decompose()
#     text = soup.get_text(separator="\n")
#     text = re.sub(r"\n\s*\n+", "\n\n", text).strip()
#     return text


# def source_intro_saved_json(url: str, path: str = GPT5_RULE_DESCRIPTION_PATH) -> str:
#     """
#     Scrapes and saves only the portion of the JetBrains Inspectopedia page
#     before the 'Locating this inspection' section into a JSON file.

#     The text is also returned so you can preview or use it immediately.
#     """
#     try:
#         full = fetch_text(url)
#     except Exception as e:
#         print(f"‚ùå Failed to fetch text from {url}: {e}")
#         return ""

#     # Find the section before "Locating this inspection"
#     m = re.search(r"^\s*Locating\s+this\s+inspection\b", full, flags=re.IGNORECASE | re.MULTILINE)
#     if m:
#         content_before = full[:m.start()].strip()
#     else:
#         content_before = full.strip()

#     # Save to JSON file
#     try:
#         os.makedirs(os.path.dirname(path), exist_ok=True)
#         with open(path, "w", encoding="utf-8") as f:
#             json.dump(
#                 {"jetbrains_scraped_rule_description": content_before},
#                 f,
#                 ensure_ascii=False,
#                 indent=2
#             )
#         print(f"‚úÖ Saved JetBrains scraped rule description context to {path}")
#     except Exception as e:
#         print(f"‚ùå Failed to save JSON: {e}")

#     return content_before


# def load_new_rule_explanation(path: str = GPT5_RULE_DESCRIPTION_PATH) -> str:
#     """
#     Loads the previously saved GPT-5 rule description for contextual use in SerpAPI searches.
#     Returns an empty string if none exists or file is corrupted.
#     """
#     if not os.path.exists(path):
#         print("‚ö†Ô∏è No saved GPT-5 rule description found.")
#         return ""
#     try:
#         with open(path, "r", encoding="utf-8") as f:
#             data = json.load(f)
#         return data.get("new_generated_rule_explanation", "")
#     except Exception as e:
#         print(f"‚ö†Ô∏è Failed to load saved description: {e}")
#         return ""

# def load_gpt5_rule_description(path: str = newRULE_EXPLANATIONS_PATH) -> str:
#     """
#     Loads the new-GPT-5-generated rule explanation to provide context to SerpAPI later.
#     Returns an empty string if none exists or file is corrupted.
#     """
#     if not os.path.exists(path):
#         print("‚ö†Ô∏è No new-GPT-5 rule explanation found.")
#         return ""
#     try:
#         with open(path, "r", encoding="utf-8") as f:
#             data = json.load(f)
#         return data.get("generated_rule_explanation from GPT-5", "")
#     except Exception as e:
#         print(f"‚ö†Ô∏è Failed to load saved description: {e}")
#         return ""

# # Step 1: Scrape and save
# context_text = source_intro_saved_json(SOURCE)
# print("\n--- Preview of saved content ---\n")
# print(context_text[:500], "...")  # show the first 500 chars

# # Step 2: Load later (e.g., in a new session)
# loaded_text = load_new_rule_explanation(GPT5_RULE_DESCRIPTION_PATH)
# print("\n--- Loaded back from JSON scraped from JetBrains  ---\n")
# print(loaded_text[:500], "...")

# # Step 3: Load later (e.g., in a new session)
# loaded_text1 = load_gpt5_rule_description(newRULE_EXPLANATIONS_PATH)
# print("\n--- Loaded back from JSON from gpt5_rule_explanation ---\n")
# print(loaded_text1[:500], "...")

#source_intro_before_locating(SOURCE)


# def fetch_text(url: str, timeout: int = 20) -> str:
#     url = SOURCE
#     resp = requests.get(url, timeout=timeout)
#     resp.raise_for_status()
#     soup = BeautifulSoup(resp.text, "html.parser")
#     for tag in soup(["script", "style", "noscript"]):
#         tag.decompose()
#     text = soup.get_text(separator="\n")
#     text = re.sub(r"\n\s*\n+", "\n\n", text).strip()
#     return text

# def source_intro_before_locating(url: str) -> str:
#     """
#     Returns ONLY the part of the page before the 'Locating this inspection' section.
#     Matches the header case-insensitively and robustly.
#     """
#     full = fetch_text(url)
#     # Some pages include non-breaking / special spaces; keep the match flexible.
#     m = re.search(r"^\s*Locating\s+this\s+inspection\b", full, flags=re.IGNORECASE | re.MULTILINE)
#     if m:
#         return full[:m.start()].strip()
#         #print(full.strip())

#         with open(path, "w", encoding="utf-8") as f:
#              json.dump(full.strip(), f, ensure_ascii=False, indent=2)
#              print(f"‚úÖ Saved GPT-5 rule description context to {path}")
#     return(full.strip())



# def load_gpt5_rule_description(path: str = GPT5_RULE_DESCRIPTION_PATH) -> str:
#     """
#     Loads the previously saved GPT-5 rule description for contextual use in SerpAPI searches.
#     Returns an empty string if none exists.
#     """
#     if not os.path.exists(path):
#         return ""
#     try:
#         with open(path, "r", encoding="utf-8") as f:
#             data = json.load(f)
#         return data.get("gpt5_generated_rule_description", "")
#     except Exception:
#         return ""


In [23]:
# ###########################################
# #### OLD CODES ######### DO NOT RUN!!!
# ########################################
# # --- Rule Validation Utilities (Using SerpAPI) ---
# import requests, time, html, re
# from typing import List, Dict, Set, Optional
# from bs4 import BeautifulSoup

# SERPAPI_API_KEY = userdata.get("SERPAPI_API_KEY")

# DEFAULT_EXCLUDES = {
#     "jetbrains.com",
#     "www.jetbrains.com",
#     "jetbrains.com.cn",
#     "www.jetbrains.com.cn",
# }

# def is_excluded(url: str, exclude_domains: Set[str]) -> bool:
#     if not exclude_domains:
#         return False
#     u = url.lower()
#     return any(dom.lower() in u for dom in exclude_domains)

# def serpapi_search(query: str, exclude_domains: Set[str] = None) -> List[str]:
#     """
#     Search with SerpAPI and return non-excluded URLs.
#     JetBrains domains are ALWAYS excluded to ensure third-party sources.
#     """
#     if not SERPAPI_API_KEY:
#         return []
#     try:
#         # merge user excludes with default JetBrains excludes
#         merged_excludes = set(DEFAULT_EXCLUDES)
#         if exclude_domains:
#             merged_excludes |= set(exclude_domains)

#         params = {"engine": "google","q": query,"api_key": SERPAPI_API_KEY,"num": "10"}
#         resp = requests.get("https://serpapi.com/search", params=params, timeout=20)
#         data = resp.json()
#         results = []
#         for item in data.get("organic_results", []):
#             url = item.get("link")
#             if url and not is_excluded(url, merged_excludes):
#                 results.append(url)
#         return results
#     except Exception:
#         return []

# # def build_query_from_rule(rule_text: str) -> str:
# #     tokens = [t for t in re.split(r'\W+', rule_text) if t]
# #     key = " ".join(tokens[:8])
# #     return f"{key} error inspection rule"

# # def build_query_from_rule(rule_text: str) -> str:
# #     m = re.search(r'Rule\s+([\w\-]+)', rule_text)
# #     print("Rule name:", m.group(1) if m else "unavailable")
# #     rule_name = m.group(1) if m else f"{TOPIC}"
# #     return f"{rule_name} {TOPIC}"
# def build_query_from_rule(rule: dict | str) -> str:
#     """
#     Build a web search query using contextual information derived from:
#       - The JetBrains Inspectopedia SOURCE page (up to 'Locating this inspection')
#       - Its summarized content via GPT-5
#       - The given rule (dict or string)

#     The resulting query is semantically focused on the rule‚Äôs core description,
#     not its literal name, and ends with FRAMEWORK and TOPIC.
#     """
#     # 1Ô∏è‚É£ Extract rule text (fallback to string)
#     if isinstance(rule, dict):
#         base_text = rule.get("rule") or rule.get("name") or rule.get("description") or ""
#         print("base_text: ", base_text)
#     else:
#         base_text = str(rule)

#     # 2Ô∏è‚É£ Fetch JetBrains Inspectopedia page content
#     try:
#         full_text = _fetch_text(SOURCE)
#         # Keep only the portion before "Locating this inspection"
#         cutoff = re.search(r"Locating\s+this\s+inspection", full_text, flags=re.IGNORECASE)
#         if cutoff:
#             source_text = full_text[:cutoff.start()]
#         else:
#             source_text = full_text
#         source_text = source_text.strip()[:20000]  # safety cap
#     except Exception as e:
#         source_text = f"(JetBrains fetch error: {e})"

#     # 3Ô∏è‚É£ Summarize JetBrains content (no temperature param)
#     try:
#         prompt = (
#             "Summarize the following JetBrains Inspectopedia documentation "
#             "in 1-2 sentences focusing on the core inspection purpose or behavior:\n\n"
#             f"{source_text}"
#         )
#         comp = client.chat.completions.create(
#             model="gpt-5",
#             messages=[{"role": "user", "content": prompt}],
#         )
#         jb_summary = comp.choices[0].message.content.strip()
#     except Exception as e:
#         jb_summary = f"Summary unavailable (GPT error: {e})"

#     # 4Ô∏è‚É£ Merge rule text + JetBrains summary ‚Üí keywords
#     tokens = [t for t in re.split(r'\W+', f"{jb_summary} {base_text}") if t]
#     key = " ".join(tokens[:10]) if tokens else FRAMEWORK

#     # 5Ô∏è‚É£ Return final query anchored to FRAMEWORK and TOPIC
#     return f"{key} {FRAMEWORK} {TOPIC}"



# def validate_rule_via_serpapi(rule_text: str,
#                               exclude_domains: Optional[Set[str]] = None,
#                               max_urls: int = 2) -> Dict[str, object]:
#     q = build_query_from_rule(rule_text)
#     urls = serpapi_search(q, exclude_domains or set())
#     return {"query": q, "urls": urls[:max_urls]}

# # JETBRAINS_DOC_URL = "https://www.jetbrains.com.cn/en-us/help/inspectopedia/MnUnresolvedPathVariable.html"

# def _fetch_text(url: str, timeout: int = 20) -> str:
#     resp = requests.get(url, timeout=timeout)
#     resp.raise_for_status()
#     soup = BeautifulSoup(resp.text, "html.parser")
#     for tag in soup(["script","style","noscript"]):
#         tag.decompose()
#     text = soup.get_text(separator="\n")
#     text = re.sub(r"\n\s*\n+", "\n\n", text).strip()
#     return text

# def summarize_third_party(url: str, client, model: str = "gpt-5", temperature: float = 1) -> str:
#     """
#     Fetch a third-party URL and summarize its content briefly.
#     """
#     try:
#         content = _fetch_text(url)[:20000]
#     except Exception as e:
#         return f"Summary unavailable (fetch error: {e})"
#     prompt = (
#         "Summarize the following web page in 2-3 concise sentences, "
#         "focusing on information relevant to static analysis rules or inspections.\n\n"
#         f"Content:\n{content}"
#     )
#     try:
#         comp = client.chat.completions.create(
#             model=model,
#             messages=[{"role":"user","content":prompt}],
#             temperature=temperature,
#         )
#         return comp.choices[0].message.content.strip()
#     except Exception as e:
#         return f"Summary unavailable (GPT error: {e})"

# def secondary_validation_via_jetbrains_gpt(rule_text: str,
#                                            client,
#                                            model: str = "gpt-5"
#                                            ) -> str:
#     """
#     Uses the JetBrains Inspectopedia page to validate the rule in one sentence.
#     """
#     try:
#         source_text = _fetch_text(SOURCE)[:30000]
#     except Exception as e:
#         return f"Secondary validation skipped (JetBrains fetch error: {e})"

#     system_msg = (
#         "You are a precise static-analysis assistant. Validate a proposed rule "
#         "STRICTLY using the provided JetBrains Inspectopedia excerpt. "
#         "Respond with a single short sentence stating whether the rule aligns with the doc and why."
#     )
#     user_msg = (
#         f"Rule to validate:\n{rule_text}\n\n"
#         f"JetBrains Inspectopedia source (excerpt):\n{source_text}"
#     )
#     try:
#         comp = client.chat.completions.create(
#             model=model,
#             messages=[
#                 {"role": "system", "content": system_msg},
#                 {"role": "user", "content": user_msg},
#             ],
#             #temperature=temperature,
#         )
#         return comp.choices[0].message.content.strip()
#     except Exception as e:
#         return f"Secondary validation failed to run GPT: {e}"

# def combined_secondary_validation(rule_text: str,
#                                   third_party_text: str,
#                                   client,
#                                   model: str = "gpt-5"
#                                  ) -> str:
#     """
#     Validate the rule using BOTH JetBrains Inspectopedia (fetched live) and the third-party page text.
#     Output: one short sentence that references alignment or mismatch.
#     """
#     try:
#         jb_text = _fetch_text(SOURCE)[:20000]
#     except Exception as e:
#         jb_text = f"(JetBrains fetch error: {e})"
#     system_msg = (
#         "You are a precise static-analysis assistant. Validate the proposed rule "
#         "STRICTLY using the provided JetBrains Inspectopedia excerpt AND the provided third-party page. "
#         "Respond with one short sentence stating whether the rule is consistent with these sources and why."
#     )
#     user_msg = (
#         f"Rule:\n{rule_text}\n\n"
#         f"JetBrains Inspectopedia excerpt:\n{jb_text}\n\n"
#         f"Third-party page excerpt:\n{third_party_text[:8000]}"
#     )
#     try:
#         comp = client.chat.completions.create(
#             model=model,
#             messages=[
#                 {"role": "system", "content": system_msg},
#                 {"role": "user", "content": user_msg},
#             ],
#            # temperature=temperature,
#         )
#         return comp.choices[0].message.content.strip()
#     except Exception as e:
#         return f"Combined validation failed (GPT error: {e})"

# print("‚úÖ Helpers updated: SerpAPI now excludes JetBrains; added summarization and combined validation")


In [24]:
# ###########################################
# #### OLD CODES ######### DO NOT RUN
# ########################################

# # --- Demo: validate first generated rule (if available) ---
# try:
#     sample_rule = rules[0] if isinstance(rules, list) else rules
#     if isinstance(sample_rule, dict) and "rule" in sample_rule:
#         text = sample_rule["rule"]
#     else:
#         text = json.dumps(sample_rule) if sample_rule else "unavailable"
#     print("Rule sample:", str(text)[:200], "...")
#     print("\n[Primary] SerpAPI URLs:")
#     out = validate_rule_via_serpapi(str(text), exclude_domains={"jetbrains.com","jetbrains.com.cn","microsoft.com"})
#     for u in out["urls"]:
#         print(" -", u)
#     print("\n[Secondary] GPT-5 x JetBrains Inspectopedia:")
#     print(secondary_validation_via_jetbrains_gpt(str(text), client))
# except Exception as e:
#     print("Demo skipped:", e)


In [25]:
# ###########################################
# #### OLD CODES ######### DO NOT RUN
# ########################################
# # --- Post-processing: save third-party URL, its summary, and GPT validation into CSV ---
# import pandas as pd
# import json

# def _get_rule_text(x):
#     if isinstance(x, dict):
#         # try typical keys
#         for k in ("rule", "name", "title", "description"):
#             if k in x:
#                 return str(x[k])
#         return json.dumps(x)
#     return str(x)

# # Decide input set
# try:
#     df = pd.read_csv("rsl_generated_rules.csv")
#     # If rules variable exists, prefer it (might be fresher) and rebuild df
#     if "rules" in globals():
#         candidate = rules
#         if isinstance(candidate, list):
#             # try to preserve columns if dicts
#             df = pd.DataFrame(candidate)
#         else:
#             df = pd.DataFrame([candidate])
# except FileNotFoundError:
#     # fall back to rules in-memory
#     if "rules" in globals():
#         candidate = rules
#         if isinstance(candidate, list):
#             df = pd.DataFrame(candidate)
#         else:
#             df = pd.DataFrame([candidate])
#     else:
#         raise RuntimeError("No rules found. Generate rules before running this cell.")

# if df.empty:
#     raise RuntimeError("Rules dataframe is empty.")

# third_urls = []
# third_summaries = []
# gpt_validations = []

# for idx, row in df.iterrows():
#     rule_text = _get_rule_text(row.to_dict())
#     out = validate_rule_via_serpapi(rule_text, exclude_domains=set())  # JetBrains excluded by default
#     url = out["urls"][0] if out["urls"] else ""
#     third_urls.append(url)

#     if url:
#         try:
#             page_text = _fetch_text(url)[:20000]
#         except Exception as e:
#             page_text = f"(fetch error: {e})"
#         # summarize
#         summ = summarize_third_party(url, client)
#         third_summaries.append(summ)
#         # combined validation (JetBrains + third-party content)
#         valid = combined_secondary_validation(rule_text, page_text, client)
#         gpt_validations.append(valid)
#     else:
#         third_summaries.append("No third-party page found")
#         gpt_validations.append("No third-party page; combined validation skipped")

# # Add/update columns
# df["third_party_url"] = third_urls
# df["third_party_summary"] = third_summaries
# df["gpt_combined_validation"] = gpt_validations

# # Save back
# df.to_csv("rsl_generated_rules.csv", index=False)
# print("‚úÖ Updated rsl_generated_rules.csv with third_party_url, third_party_summary, gpt_combined_validation")
