In [None]:
!pip install langchain langchain-google-genai

In [3]:
import os
from collections import defaultdict
from google.colab import userdata
import json

os.environ["GOOGLE_API_KEY"] = userdata.get('GOOGLE_API_KEY')

from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.pydantic_v1 import Field
from pydantic import BaseModel

In [2]:
# Define the pydantic model for our structured output
class LogAnalysis(BaseModel):
    severity: str = Field(description="The log's severity. (e.g., 'INFO', 'WARNING', 'ERROR', 'CRITICAL')")
    summary: str = Field(description="A brief, human-readable summary of the log entry.")
    entities: list[str] = Field(description="A list of key entities, such as IPs, user IDs, or error codes.")
    suggested_action: str = Field(description="A one-sentence suggested action for an administrator.")

# Initialize the LLM
# We can use a fast model since the task is simple extraction
llm = ChatGoogleGenerativeAI(model="gemini-flash-latest", temperature=0)

In [9]:
# 1. Define the Pydantic model
class LogAnalysis(BaseModel):
    severity: str = Field(description="The log's severity. (e.g., 'INFO', 'WARNING', 'ERROR', 'CRITICAL')")
    summary: str = Field(description="A brief, human-readable summary of the log entry.")
    entities: list[str] = Field(description="A list of key entities, such as IPs, user IDs, or error codes.")
    suggested_action: str = Field(description="A one-sentence suggested action for an administrator.")

# 2. Initialize the LLM
llm = ChatGoogleGenerativeAI(model="gemini-flash-latest", temperature=0)

# 3. Initialize the parser
parser = JsonOutputParser(pydantic_object=LogAnalysis)

# 4. Create the prompt template
prompt = ChatPromptTemplate.from_messages([
    ("system", "You are an expert system administrator and log analyst. Your job is to analyze log entries and return a structured JSON analysis. {format_instructions}"),
    ("user", "{log_entry}")
])

# 5. Bind instructions
format_instructions = parser.get_format_instructions()
prompt_with_instructions = prompt.partial(format_instructions=format_instructions)

# 6. Build the analyzer chain
analyzer_chain = prompt_with_instructions | llm | parser

# 7. Define our memory file path
MEMORY_FILE = "log_memory.json"

print("Analyzer chain and memory file path are ready.")

Analyzer chain and memory file path are ready.




In [10]:
def load_memory():
    """Loads the entity memory from a JSON file."""
    try:
        with open(MEMORY_FILE, 'r') as f:
            # Use defaultdict to avoid errors if an entity is new
            return defaultdict(lambda: {'count': 0, 'history': []}, json.load(f))
    except FileNotFoundError:
        # Return a new empty defaultdict if the file doesn't exist
        return defaultdict(lambda: {'count': 0, 'history': []})

def save_memory(memory_data):
    """Saves the entity memory to a JSON file."""
    with open(MEMORY_FILE, 'w') as f:
        json.dump(memory_data, f, indent=4)

In [11]:
def analyze_log_statefully(log_entry):
    """
    Analyzes a log entry using a two-call process to
    correlate it with historical memory.
    """

    # --- 1. Load Memory ---
    memory = load_memory()

    # --- 2. Call 1: Initial Analysis ---
    # Run the cheap analysis to find out *what* is in the log
    try:
        initial_analysis = analyzer_chain.invoke({"log_entry": log_entry})
        entities = initial_analysis.get('entities', [])
    except Exception as e:
        print(f"Error during initial analysis: {e}")
        return

    # --- 3. Python Logic: Update Memory ---
    context_string = ""
    if entities:
        entity_contexts = []
        for entity in entities:
            # Update the count and history for this entity
            memory[entity]['count'] += 1
            memory[entity]['history'].append(initial_analysis['summary'])

            # Create a context summary for the *next* prompt
            entity_contexts.append(
                f"Entity '{entity}' has been seen {memory[entity]['count']} time(s)."
            )

        context_string = "Historical Context: " + " ".join(entity_contexts)

    # --- 4. Call 2: Augmented Analysis ---
    # Now, we run the *same* agent again, but with the new memory context

    augmented_prompt = f"{context_string}\n\nLog Entry: {log_entry}"

    print("--- [Stateful Agent] ---")
    print(f"Augmented Prompt: {augmented_prompt}")

    try:
        stateful_analysis = analyzer_chain.invoke({"log_entry": augmented_prompt})
    except Exception as e:
        print(f"Error during stateful analysis: {e}")
        return

    # --- 5. Save Memory & Return ---
    save_memory(memory)

    # Return the *smarter*, stateful analysis
    return stateful_analysis

In [14]:
log_to_test = "[2025-11-15 17:03:30] [WARNING] (SSH) Failed password attempt for user 'root' from 185.12.33.5"

print(f"--- ANALYZING NEW LOG ---")
print(f"Input: {log_to_test}\n")

final_response = analyze_log_statefully(log_to_test)

if final_response:
    print("\n--- FINAL STATEFUL RESPONSE ---")
    print(f"Severity: {final_response['severity']}")
    print(f"Summary: {final_response['summary']}")
    print(f"Entities: {final_response['entities']}")
    print(f"**Suggested Action**: {final_response['suggested_action']}**")

--- ANALYZING NEW LOG ---

--- [Stateful Agent] ---
Augmented Prompt: Historical Context: Entity 'SSH' has been seen 3 time(s). Entity 'root' has been seen 3 time(s). Entity '185.12.33.5' has been seen 3 time(s).


--- FINAL STATEFUL RESPONSE ---
Summary: A persistent brute-force attack targeting the 'root' user via SSH has been detected. This is the fourth consecutive failed login attempt originating from IP address 185.12.33.5.
Entities: ['SSH', 'root', '185.12.33.5']
**Suggested Action**: Immediately block the source IP address (185.12.33.5) using a firewall or security tool (e.g., fail2ban) due to repeated failed attempts. Review SSH configuration to ensure root login is disabled and strong rate limiting is in place.**


In [None]:
log_to_test = "[2025-11-15 17:03:30] [WARNING] (SSH) Failed password attempt for user 'root' from 185.12.33.5"

print(f"--- ANALYZING NEW LOG ---")
print(f"Input: {log_to_test}\n")

final_response = analyze_log_statefully(log_to_test)

if final_response:
    print("\n--- FINAL STATEFUL RESPONSE ---")
    print(f"Severity: {final_response['severity']}")
    print(f"Summary: {final_response['summary']}")
    print(f"Entities: {final_response['entities']}")
    print(f"**Suggested Action**: {final_response['suggested_action']}**")

In [None]:
log_to_test = "[2025-11-15 17:03:30] [WARNING] (SSH) Failed password attempt for user 'root' from 185.12.33.5"

print(f"--- ANALYZING NEW LOG ---")
print(f"Input: {log_to_test}\n")

final_response = analyze_log_statefully(log_to_test)

if final_response:
    print("\n--- FINAL STATEFUL RESPONSE ---")
    print(f"Severity: {final_response['severity']}")
    print(f"Summary: {final_response['summary']}")
    print(f"Entities: {final_response['entities']}")
    print(f"**Suggested Action**: {final_response['suggested_action']}**")