Steps in this workbook
Step 1: Load the llm using langchain. Test the llm to see if its connected
Step 2: Load mitre attack json files and extract the mitigations, software, and groups for technique Brute Force Password Guessing
Step 3: use langchain to write rule detection guidelines with tools and save them in a json file 
Step 4: Load the downloaded csv file contraining cybersecurity intrusion data, load threat detection guidelines
Step 5: use langchain to filter logs that are attacks 
Step 6: do an evaluation on accuracy percentage


In [None]:
# Step 1: Define LLM, in this case we use chatgpt-3b. Test if it works.
import os
os.environ["OPENAI_API_KEY"] = "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"

from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage

llm = ChatOpenAI(
    model="gpt-3.5-turbo",
    # temperature=0
)

response = llm.invoke([
    HumanMessage(content="What century are we in?")
])
print(response.content)

We are currently in the 21st century.


In [None]:
# Step 2: open and download mitre attack json file from github
# Install required lib if needed:
# pip install stix2

from stix2 import MemoryStore, Filter
import requests, json, os

# Download latest STIX file once (local copy)
# url = "https://raw.githubusercontent.com/mitre-attack/attack-stix-data/master/enterprise-attack/enterprise-attack.json"
filename = "enterprise-attack.json"

# if not os.path.exists(filename):
#     print("Downloading enterprise ATT&CK JSON...")
#     r = requests.get(url)
#     with open(filename, "wb") as f:
#         f.write(r.content)

# Load into MemoryStore
src = MemoryStore()
src.load_from_file(filename)

# Get the technique object by ATT&CK ID T1110.001
tech = src.query([
    Filter("type", "=", "attack-pattern"),
    Filter("external_references.external_id", "=", "T1110.001")
])[0]

print("Technique:", tech["name"])
print("Description:", tech["description"][:200], "...\n")

# Get related objects (mitigations, groups, software)
rels = src.relationships(tech["id"], "uses", source_only=False)
related_ids = {r["source_ref"] if r["target_ref"] == tech["id"] else r["target_ref"] for r in rels}
related_objs = [src.get(rid) for rid in related_ids if src.get(rid)]

print("Found related objects:", len(related_objs))
for obj in related_objs[:5]:
    print("-", obj["type"], obj.get("name", obj.get("id")))

# Save relevant subset
subset = [tech] + related_objs + rels  # technique + related objects + relationships

# stix2 objects are not plain dicts, so convert them first
def to_dict(obj):
    if isinstance(obj, dict):
        return obj
    try:
        return obj.serialize()
    except Exception:
        return obj

outfile = "T1110.001_subset.json"
with open(outfile, "w") as f:
    json.dump([to_dict(obj) for obj in subset if obj], f, indent=2)

print(f"\nSaved {len(subset)} objects into {outfile}")


Technique: Password Guessing
Description: Adversaries with no prior knowledge of legitimate credentials within the system or environment may guess passwords to attempt access to accounts. Without knowledge of the password for an account, an a ...

Found related objects: 11
- malware P.A.S. Webshell
- intrusion-set APT29
- tool CrackMapExec
- malware HermeticWizard
- malware Lucifer

Saved 23 objects into T1110.001_subset.json


In [None]:
# Step 3: Use langchain to write rule detection by giving our LLM some tools
from langchain_core.tools import tool
from langchain_core.messages import HumanMessage, ToolMessage

@tool
def internal_lookup(query: str) -> str:
    """Look up info from local file 'T1110.001_subset.json'."""
    with open("T1110.001_subset.json", "r") as f:
        for line in f:
            if query.lower() in line.lower():
                return line.strip()
    return "No matching internal note found."

tools = [internal_lookup]
llm_with_tools = llm.bind_tools(tools)

# Step 1: User query
query = "You are an intelligent threat hunter. Based on this file that covers the mitigations, software, and groups for technique Brute Force subtechnique Password Guessing (T1110.001), can you write simple threat detection rules"
messages = [HumanMessage(content=query)]

# Step 2: Let LLM decide if tool is needed
response = llm_with_tools.invoke(messages)
messages.append(response)

# Step 3: Run tools if any
if response.tool_calls:
    for tool_call in response.tool_calls:
        selected_tool = {
            "internal_lookup": internal_lookup
        }[tool_call["name"]]

        tool_output = selected_tool.invoke(tool_call["args"])
        messages.append(
            ToolMessage(content=tool_output, tool_call_id=tool_call["id"])
        )

    # Step 4: Let LLM complete reasoning
    final_response = llm_with_tools.invoke(messages)
    print("Final answer:", final_response.content)

    #Save the rules
    with open("generated_rules.json", "w") as f:
        f.write(response.content.strip() + "\n")

    print("Rules saved to generated_rules.json")

else:
    print("No tools needed. Answer:", response.content)

Final answer: Based on the information for the technique Brute Force subtechnique Password Guessing (T1110.001), here are some simple threat detection rules that you can use to monitor for potential brute force password guessing attacks:

1. **Monitor Authentication Logs**: 
   - Monitor authentication logs for system and application login failures of valid accounts. 
   - If authentication failures are high, it may indicate a brute force attempt to gain access to a system using legitimate credentials.
   
2. **Watch for High Number of Failed Login Attempts**:
   - Set up alerts for a high number of failed login attempts within a short period of time.
   - This could be an indicator of an adversary attempting a brute force attack to guess passwords.

3. **Detect Unusual Login Patterns**:
   - Look for unusual login patterns such as multiple failed login attempts followed by a successful login.
   - Anomalies in login behavior could signify a brute force attack in progress.

4. **Monito

In [None]:
# Step 4: Load logs and threat detection guidelines
# Load Csv file with logs
import pandas as pd
logs_df = pd.read_csv("cybersecurity_intrusion_data.csv") 

# Load rules written by the LLM
with open("generated_rules.json", "r") as f:
    rules_text = f.read()

In [36]:
from langchain_core.messages import HumanMessage
from tqdm import tqdm

log_features = [
    "session_id", "network_packet_size", "protocol_type", "login_attempts",
    "session_duration", "encryption_used", "ip_reputation_score", "failed_logins",
    "browser_type", "unusual_time_access"
]

def classify_log(row):
    log_info = "\n".join([f"{col}: {row[col]}" for col in log_features])
    prompt = f"""
You are a threat detection assistant. Given the following cybersecurity log:
{log_info}

Based on the following detection rules/guidelines:
{rules_text}

Determine if this log represents an attack or not. Answer with "1" as attack or "0" as benign only.
"""
    response = llm.invoke([HumanMessage(content=prompt)])
    prediction = response.content.strip()
    return prediction

predictions = []
for i, (index, row) in enumerate(logs_df.head(100).iterrows()):
    pred = classify_log(row)
    predictions.append(pred)
    # Print first 5 or every log as it predicts
    if i < 5:
        print(f"Session {row['session_id']} predicted as: {pred}")

logs_df.loc[:99, "predicted_label"] = predictions  # only first 100
print("\nClassified logs saved to 'classified_logs.csv'")

Session SID_00001 predicted as: 0
Session SID_00002 predicted as: 1
Session SID_00003 predicted as: 1
Session SID_00004 predicted as: 0
Session SID_00005 predicted as: 0

Classified logs saved to 'classified_logs.csv'


In [37]:
from sklearn.metrics import accuracy_score, classification_report
# Ensure labels are lowercase strings
logs_df["attack_detected"] = logs_df["attack_detected"].astype(str).str.lower()
logs_df["predicted_label"] = logs_df["predicted_label"].astype(str).str.lower()

accuracy = accuracy_score(logs_df["attack_detected"], logs_df["predicted_label"])
print(f"Accuracy: {accuracy*100:.2f}%\n")
print("Detailed classification report:\n")
print(classification_report(logs_df["attack_detected"], logs_df["predicted_label"]))

Accuracy: 0.45%

Detailed classification report:

                                                                                                                                                                                                                                                                                                                                                                                                                                                 precision    recall  f1-score   support

                                                                                                                                                                                                                                                                                                                                                                                                                                              0       0.60      0.00      0.00      5273
   

  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
