# AI SOC Agent
**Objective:**  
An autonomous multi-agent system designed to automate Security Operations Center (SOC) workflows. It triages incoming alerts, performs real-time threat intelligence lookups, conducts web research, and generates professional incident reports to assist human analysts in rapid decision-making.

**Key features**:  

Autonomous Investigation: The system doesn't just flag an alert; it investigates the reputation and context of the threat before alerting a human.

Real-time Threat Intel: By integrating live web search, the agent can identify brand-new threats that weren't in its original training data.

Automated Reporting: Reduces the time spent on administrative "Tier 1" documentation by 90%, allowing analysts to focus on active mitigation.

## 1 Architecture

The project follows a directed cyclic graph architecture, where different nodes (agents) collaborate by updating a shared "State".

1. The Orchestration Layer
- LangGraph: Used for state management and workflow orchestration. Unlike linear chains, LangGraph allows for cyclic patterns, enabling agents to revisit previous steps if more information is needed.

- Shared State: A TypedDict that acts as the "digital notebook," storing the investigation history and routing instructions across the entire team.

2. Specialized Agent Nodes
Each node is a specialized worker focused on a single part of the incident lifecycle:

- Triage Agent: Analyzes raw logs (e.g., SSH failed logins) and extracts key indicators of compromise (IoCs) like IP addresses using regex.

- Research Agent (Web Intel): Uses the Tavily API to search the live web for the latest threat actor behaviors, CVEs, or forum discussions related to the detected IoCs.

- Incident Responder Agent: Synthesizes the findings from previous agents to generate a structured SOC report with severity levels and mitigation steps.





## 2 Frameworks & Tools

- Large Language Model: gpt-4o-mini serves as the "brain," providing reasoning and multi-step planning.

- VirusTotal API: Used by the check_ip_reputation tool to cross-reference IP addresses against 90+ antivirus and blacklisting engines.

- Tavily Search: A search engine optimized for AI agents, providing clean, structured results for RAG (Retrieval-Augmented Generation) without the noise of a standard web search.



## 3 Setup & Imports
Import all necessary libraries and API keys

In [2]:
# Install essential libraries
!pip install -qU langchain langchain-openai langgraph tavily-python requests langchain-tavily langchain-community

import re
import os
import requests
from typing import Annotated, TypedDict, List
from langchain_openai import ChatOpenAI
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from langgraph.graph import StateGraph, END
from google.colab import userdata
from langchain_community.tools.tavily_search import TavilySearchResults

# API Keys
os.environ["OPENAI_API_KEY"] = userdata.get('OPENAI_API_KEY') # Set your API keys in Colab's "Secrets" (the key icon on the left)
os.environ["TAVILY_API_KEY"] = userdata.get('TAVILY_API_KEY')
os.environ["VIRUSTOTAL_API_KEY"] = userdata.get('VIRUSTOTAL_API_KEY')

# Initialize the LLM
llm = ChatOpenAI(model="gpt-4o-mini")

# Explicitly define the variable for the tool to access
VIRUSTOTAL_API_KEY = os.environ.get("VIRUSTOTAL_API_KEY")

# Initialize the Tavily tool
# max_results=3 keeps the response concise for SOC alerts
web_search_tool = TavilySearchResults(max_results=3)


[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m2.5/2.5 MB[0m [31m23.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m1.0/1.0 MB[0m [31m36.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m51.0/51.0 kB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
[?25h

  web_search_tool = TavilySearchResults(max_results=3)


## 2 Threat Check Tool

In [3]:
from langchain.tools import tool

VT_IP_URL = "https://www.virustotal.com/api/v3/ip_addresses/{}"

_IPv4_PATTERN = re.compile(
    r"^(?:(?:25[0-5]|2[0-4]\d|1?\d{1,2})\.){3}(?:25[0-5]|2[0-4]\d|1?\d{1,2})$"
)

def _is_valid_ipv4(ip: str) -> bool:
    return bool(_IPv4_PATTERN.match(ip))

@tool
def check_ip_reputation(ip: str) -> str:
    """Checks if an IP address is known for malicious activity using VirusTotal."""
    # Basic validation
    if not ip or not isinstance(ip, str):
        return "Error: an IP address (string) must be provided."

    if not _is_valid_ipv4(ip):
        return f"Error: '{ip}' does not look like a valid IPv4 address."

    if not VIRUSTOTAL_API_KEY:
        return "Error: VirusTotal API key not configured. Set VIRUSTOTAL_API_KEY environment variable."

    headers = {"x-apikey": VIRUSTOTAL_API_KEY}

    try:
        resp = requests.get(VT_IP_URL.format(ip), headers=headers, timeout=10)

        if resp.status_code == 401:
            return "Error: Unauthorized ‚Äî check your VirusTotal API key."
        if resp.status_code == 429:
            return "Error: Rate limited by VirusTotal. Try again later."
        if resp.status_code == 404:
            return f"No VirusTotal data found for IP {ip}."
        resp.raise_for_status()

        data = resp.json()

        # Defensive parsing
        attrs = data.get("data", {}).get("attributes", {})
        stats = attrs.get("last_analysis_stats", {})
        malicious = int(stats.get("malicious", 0))
        suspicious = int(stats.get("suspicious", 0))
        harmless = int(stats.get("harmless", 0))
        undetected = int(stats.get("undetected", 0))

        # Optional: include number of engines that analyzed it
        total_engines = malicious + suspicious + harmless + undetected

        if malicious > 0:
            return (
                f"üö® Alert: IP {ip} is MALICIOUS.\n"
                f"Malicious detections: {malicious} / {total_engines} engines.\n"
                f"Suspicious detections: {suspicious}."
            )

        if suspicious > 0:
            return (
                f"‚ö†Ô∏è Warning: IP {ip} is suspicious.\n"
                f"Suspicious detections: {suspicious} / {total_engines} engines."
            )

        return f"‚úÖ IP {ip} appears clean according to VirusTotal ({total_engines} engines checked)."

    except requests.exceptions.Timeout:
        return "Error: request to VirusTotal timed out."
    except requests.exceptions.RequestException as e:
        # Avoid returning full exception in production; keep it short
        return f"Error querying VirusTotal: {str(e)}"
    except (ValueError, TypeError):
        return "Error: unexpected response format from VirusTotal."

## 3 Define the State
The "State" is the shared memory of the SOC team.



In [4]:
class AgentState(TypedDict):
    messages: Annotated[List[BaseMessage], "The history of the investigation"]
    next_step: str # Controls the flow



## 4 Define the Agents

We will create two nodes: one triage node for Investigation and one responder node for Final Reporting. A research node for web searches (Tavily).

In [5]:
def triage_node(state: AgentState):
    # Get the last message from the user (the alert log)
    msg = state['messages'][-1].content

    # Simple regex to find an IP in the alert message
    ip_match = re.search(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', msg)

    if ip_match:
        target_ip = ip_match.group(0)
        # Use the tool we defined earlier
        res = check_ip_reputation.run(target_ip)
    else:
        res = "No valid IPv4 address found in the log to investigate."

    return {
        "messages": [AIMessage(content=f"Investigation Result: {res}")],
        "next_step": "responder"
    }

def research_node(state: AgentState):
    """Searches the web for additional threat intelligence context."""
    last_msg = state['messages'][-1].content

    # Extract the core topic or CVE to research
    # For this demo, we'll ask the agent to find news about the investigated IP/threat
    query = f"Latest threat intelligence and known attacks related to: {last_msg}"

    # Execute the web search
    search_results = web_search_tool.invoke({"query": query})

    return {
        "messages": [AIMessage(content=f"Web Research Findings: {search_results}")],
        "next_step": "responder"
    }

def responder_node(state: AgentState):
    # The responder looks at the investigation result
    investigation = state['messages'][-1].content

    # We use the llm defined in Step 1
    prompt = f"Based on this investigation: {investigation}, write a professional SOC incident report including severity and recommended next steps."
    response = llm.invoke(prompt)

    return {"messages": [response], "next_step": END}




## 5 Build the Graph

This connects the logic into a workflow

In [6]:
workflow = StateGraph(AgentState)

# Add Nodes
workflow.add_node("triage", triage_node)
workflow.add_node("responder", responder_node)
workflow.add_node("researcher", research_node)

# Set Entry Point
workflow.set_entry_point("triage")

# Add Conditional Edges
workflow.add_edge("triage", "researcher")
workflow.add_edge("researcher", "responder")
workflow.add_edge("responder", END)

# Compile
app = workflow.compile()

## 6 Run the SOC Analyst

In [7]:
# Simulate an incoming SIEM alert with a different IP to test the regex
alert_log = "Alert: High volume of traffic detected from source 185.220.101.1 to internal database." # IP must be in the alert title

inputs = {"messages": [HumanMessage(content=alert_log)]}

for output in app.stream(inputs):
    for key, value in output.items():
        print(f"\n--- Current Node: {key} ---")
        print(value["messages"][-1].content)


--- Current Node: triage ---
Investigation Result: üö® Alert: IP 185.220.101.1 is MALICIOUS.
Malicious detections: 13 / 95 engines.
Suspicious detections: 1.

--- Current Node: researcher ---
Web Research Findings: [{'title': '185.220.101.1 reported for spam and brute force attacks - CleanTalk', 'url': 'https://cleantalk.org/blacklists/185.220.101.1', 'content': '| Date/time (GMT) | IP | Nickname | Event |\n ---  --- |\n| Dec 06, 2025 21:40:06 | 185.220.101.1 | devsign\\\\\\\\\\ | invalid\\_username |\n| Dec 06, 2025 14:31:46 | 185.220.101.1 | devsign\\\\\\\\\\ | invalid\\_username |\n| Dec 05, 2025 19:20:04 | 185.220.101.1 | devsign\\\\\\\\\\ | invalid\\_username |\n| Dec 05, 2025 14:12:05 | 185.220.101.1 | devsign\\\\\\\\\\ | invalid\\_username |\n| Dec 05, 2025 07:02:06 | 185.220.101.1 | devsign\\\\\\\\\\ | invalid\\_username |\n\n### 185.220.101.1 spam and brute force activity on date [...] Delete the record\n\n# 185.220.101.1 reported for spam and brute force attacks\n\nThe log 