# Lab 14: C2 Traffic Analysis

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/depalmar/ai_for_the_win/blob/main/notebooks/lab14_c2_traffic.ipynb)

Detect command-and-control communications using ML and AI.

## Learning Objectives
- Beaconing detection algorithms
- DNS tunneling identification
- JA3/JA3S fingerprinting
- LLM-powered traffic classification

In [None]:
!pip install pandas numpy scikit-learn anthropic -q

In [None]:
import pandas as pd
import numpy as np
from typing import List, Dict
from collections import Counter

## 1. Beaconing Detection

C2 beacons have regular intervals with low jitter (variation).

In [None]:
# Generate sample beacon data
np.random.seed(42)

# Normal traffic (random intervals)
normal_intervals = np.random.exponential(60, 50)  # Random, exponential distribution

# C2 beacon (regular intervals with small jitter)
beacon_intervals = 30 + np.random.normal(0, 2, 50)  # ~30 seconds with small jitter

def calculate_beacon_score(intervals: np.ndarray) -> Dict:
    """Calculate beaconing probability score."""
    mean_interval = np.mean(intervals)
    std_interval = np.std(intervals)
    jitter = std_interval / mean_interval if mean_interval > 0 else 1
    
    # Low jitter = high beacon probability
    beacon_score = max(0, 1 - (jitter * 2))
    
    return {
        "mean_interval": round(mean_interval, 2),
        "std_interval": round(std_interval, 2),
        "jitter": round(jitter, 4),
        "beacon_score": round(beacon_score, 2),
        "is_beacon": beacon_score > 0.7
    }

print("Normal Traffic:")
print(calculate_beacon_score(normal_intervals))
print("\nC2 Beacon:")
print(calculate_beacon_score(beacon_intervals))

## 2. DNS Tunneling Detection

In [None]:
# Sample DNS queries
normal_dns = [
    "www.google.com",
    "api.github.com",
    "cdn.example.com",
]

tunneling_dns = [
    "aGVsbG8gd29ybGQgdGhpcyBpcyBhIHRlc3Q.evil.com",
    "c29tZSBlbmNvZGVkIGRhdGEgaGVyZQ.evil.com",
    "bW9yZSBkYXRhIGV4ZmlsdHJhdGlvbg.evil.com",
]

def analyze_dns_query(query: str) -> Dict:
    """Analyze DNS query for tunneling indicators."""
    parts = query.split('.')
    subdomain = parts[0] if len(parts) > 2 else ""
    
    # Calculate entropy of subdomain
    if subdomain:
        freq = Counter(subdomain)
        probs = [f/len(subdomain) for f in freq.values()]
        entropy = -sum(p * np.log2(p) for p in probs if p > 0)
    else:
        entropy = 0
    
    return {
        "query": query,
        "subdomain_length": len(subdomain),
        "entropy": round(entropy, 2),
        "is_suspicious": len(subdomain) > 30 or entropy > 4.0
    }

print("Normal DNS:")
for q in normal_dns:
    print(analyze_dns_query(q))

print("\nDNS Tunneling:")
for q in tunneling_dns:
    print(analyze_dns_query(q))

## 3. JA3 Fingerprinting

In [None]:
# Known malicious JA3 hashes (examples)
MALICIOUS_JA3 = {
    "e7d705a3286e19ea42f587b344ee6865": "Cobalt Strike",
    "72a589da586844d7f0818ce684948eea": "Metasploit",
    "a0e9f5d64349fb13191bc781f81f42e1": "Trickbot",
}

def check_ja3(ja3_hash: str) -> Dict:
    """Check JA3 hash against known malware."""
    if ja3_hash in MALICIOUS_JA3:
        return {
            "ja3": ja3_hash,
            "status": "MALICIOUS",
            "malware_family": MALICIOUS_JA3[ja3_hash]
        }
    return {
        "ja3": ja3_hash,
        "status": "UNKNOWN",
        "malware_family": None
    }

# Test
print(check_ja3("e7d705a3286e19ea42f587b344ee6865"))
print(check_ja3("abc123def456"))

## 4. Traffic Pattern Classification with ML

In [None]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split

# Generate synthetic network flow data
np.random.seed(42)

def generate_flow_data(n_samples: int, is_c2: bool) -> np.ndarray:
    """Generate synthetic network flow features."""
    if is_c2:
        # C2: Regular intervals, consistent sizes, low ports
        intervals = np.random.normal(30, 5, n_samples)
        sizes = np.random.normal(500, 50, n_samples)
        ports = np.random.choice([80, 443, 8080], n_samples)
    else:
        # Normal: Random intervals, varied sizes
        intervals = np.random.exponential(60, n_samples)
        sizes = np.random.exponential(1000, n_samples)
        ports = np.random.randint(1024, 65535, n_samples)
    
    return np.column_stack([intervals, sizes, ports])

# Create dataset
X_c2 = generate_flow_data(100, is_c2=True)
X_normal = generate_flow_data(100, is_c2=False)
X = np.vstack([X_c2, X_normal])
y = np.array([1]*100 + [0]*100)

# Train classifier
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
clf = RandomForestClassifier(n_estimators=50, random_state=42)
clf.fit(X_train, y_train)

print(f"Model Accuracy: {clf.score(X_test, y_test):.2%}")
print(f"Feature Importance: {dict(zip(['interval', 'size', 'port'], clf.feature_importances_.round(2)))}")

## 5. LLM-Powered Traffic Analysis

In [None]:
from anthropic import Anthropic
import json

def analyze_traffic_with_llm(flows: List[Dict]) -> str:
    """Use LLM to analyze network traffic patterns."""
    
    client = Anthropic()
    
    prompt = f"""
    Analyze these network flows for C2 communication patterns.
    
    FLOWS:
    {json.dumps(flows, indent=2)}
    
    Look for:
    1. Beaconing patterns (regular intervals)
    2. Unusual port usage
    3. Suspicious payload sizes
    4. Known C2 indicators
    
    Provide MITRE ATT&CK technique mappings.
    """
    
    response = client.messages.create(
        model="claude-sonnet-4-20250514",
        max_tokens=1500,
        messages=[{"role": "user", "content": prompt}]
    )
    
    return response.content[0].text

# Sample flows for analysis
sample_flows = [
    {"src": "192.168.1.100", "dst": "45.33.32.156", "port": 443, "bytes": 512, "interval": 30},
    {"src": "192.168.1.100", "dst": "45.33.32.156", "port": 443, "bytes": 508, "interval": 31},
    {"src": "192.168.1.100", "dst": "45.33.32.156", "port": 443, "bytes": 515, "interval": 29},
]

# Uncomment to run with API key
# analysis = analyze_traffic_with_llm(sample_flows)
# print(analysis)

## Key Takeaways

1. **Beaconing**: Low jitter in connection intervals
2. **DNS Tunneling**: Long subdomains, high entropy
3. **JA3**: TLS fingerprints identify malware families
4. **ML Classification**: Combine features for detection

## Next Steps
- **Lab 15**: Lateral Movement Detection
- **Lab 16**: Threat Actor Profiling