In [15]:
import pandas as pd
import numpy as np
import requests
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime
import json
import os

In [16]:
# Local Ollama API Configuration
OLLAMA_URL = "http://localhost:11434/api/generate"
MODEL_NAME_DETECTION = "hf.co/omsh97/Industry_Project_v2_a:Q4_K_M"  # Change to your detection model name
MODEL_NAME_TYPE = "hf.co/omsh97/Industry_Project_v3_a:Q4_K_M"         # Change to your type/classification model name

In [17]:
# Create results directory
results_dir = "test_results_local"
os.makedirs(results_dir, exist_ok=True)

def format_network_data(row):
    """Format network data into a structured input format"""
    return f"""Network Traffic Analysis Request:

Protocol: {row['proto']}
Service: {row['service']}
State: {row['state']}
Duration: {row['dur']} seconds

Traffic Statistics:
Source packets: {row['spkts']}
Destination packets: {row['dpkts']}
Source bytes: {row['sbytes']}
Destination bytes: {row['dbytes']}
Source load: {row['sload']} bits/sec
Destination load: {row['dload']} bits/sec
Source TTL: {row['sttl']}
Destination TTL: {row['dttl']}

Connection Statistics:
TCP round-trip time: {row.get('smean', 0)}
SYN-ACK time: {row.get('dmean', 0)}
ACK data time: {row.get('smeansz', 0)}
Mean packet size (source): {row.get('dmeansz', 0)}
Mean packet size (destination): {row.get('ct_srv_src', 0)}"""

In [18]:
def generate_response(prompt, instruction, model_name):
    try:
        full_prompt = f"{instruction}\n\n{prompt}"
        payload = {
            "model": model_name,
            "prompt": full_prompt,
            "stream": False
        }
        response = requests.post(OLLAMA_URL, json=payload)
        response.raise_for_status()
        data = response.json()
        return data.get('response') or data.get('message', '').strip()
    except Exception as e:
        print(f"Error generating response from Ollama (model: {model_name}): {e}")
        if hasattr(e, 'response') and e.response is not None:
            print(f"Server response: {e.response.text}")
        return None

In [19]:
def analyze_responses(detection_responses, type_responses, true_labels, true_attack_types):
    detection_stats = {
        'total': len(detection_responses),
        'attack_detected': 0,
        'false_positives': 0,
        'false_negatives': 0,
        'true_positives': 0,
        'true_negatives': 0
    }
    type_stats = {
        'total': len(type_responses),
        'attack_types': {},
        'correct_categorizations': 0,
        'incorrect_categorizations': 0,
        'categorization_matrix': {}
    }
    for i, (det_resp, type_resp, true_label, true_type) in enumerate(zip(detection_responses, type_responses, true_labels, true_attack_types)):
        is_attack = any(keyword in (det_resp or '').lower() for keyword in ['shows characteristics', 'investigation', 'suspicious'])
        if is_attack and true_label == 1:
            detection_stats['true_positives'] += 1
        elif not is_attack and true_label == 0:
            detection_stats['true_negatives'] += 1
        elif is_attack and true_label == 0:
            detection_stats['false_positives'] += 1
        elif not is_attack and true_label == 1:
            detection_stats['false_negatives'] += 1
        if is_attack:
            detection_stats['attack_detected'] += 1
        if is_attack and type_resp and true_label == 1:
            predicted_type = type_resp.split('.')[0].strip()
            type_stats['attack_types'][predicted_type] = type_stats['attack_types'].get(predicted_type, 0) + 1
            if true_type not in type_stats['categorization_matrix']:
                type_stats['categorization_matrix'][true_type] = {}
            type_stats['categorization_matrix'][true_type][predicted_type] = type_stats['categorization_matrix'][true_type].get(predicted_type, 0) + 1
            if predicted_type.lower() == true_type.lower():
                type_stats['correct_categorizations'] += 1
            else:
                type_stats['incorrect_categorizations'] += 1
    return detection_stats, type_stats

In [20]:
def visualize_results(detection_stats, type_stats, timestamp):
    plt.figure(figsize=(18, 6))
    plt.subplot(1, 3, 1)
    confusion_matrix = np.array([
        [detection_stats['true_negatives'], detection_stats['false_positives']],
        [detection_stats['false_negatives'], detection_stats['true_positives']]
    ])
    sns.heatmap(confusion_matrix, annot=True, fmt='d', cmap='Blues',
                xticklabels=['Normal', 'Attack'],
                yticklabels=['Normal', 'Attack'])
    plt.title('Attack Detection Confusion Matrix')
    plt.subplot(1, 3, 2)
    attack_types = list(type_stats['attack_types'].keys())
    counts = list(type_stats['attack_types'].values())
    plt.bar(attack_types, counts)
    plt.title('Attack Type Distribution')
    plt.xticks(rotation=45, ha='right')
    plt.subplot(1, 3, 3)
    correct = type_stats['correct_categorizations']
    incorrect = type_stats['incorrect_categorizations']
    plt.bar(['Correct', 'Incorrect'], [correct, incorrect])
    plt.title('Attack Type Categorization Accuracy')
    plt.tight_layout()
    plt.savefig(os.path.join(results_dir, f'results_{timestamp}.png'))
    plt.close()

In [21]:
def main():
    df = pd.read_csv('CSV Files/Training and Testing Sets/UNSW_NB15_testing-set.csv')
    df = df.sample(n=27000)


    detection_responses = []
    type_responses = []
    for _, row in df.iterrows():
        network_data = format_network_data(row)
        detection_instruction = "Analyze the following network traffic data and determine if it shows signs of malicious activity."
        detection_response = generate_response(network_data, detection_instruction, MODEL_NAME_DETECTION)
        detection_responses.append(detection_response)
        if detection_response and any(keyword in detection_response.lower() for keyword in ['shows characteristics', 'investigation', 'suspicious']):
            type_instruction = "Analyze the following network traffic data and determine the specific type of attack being performed."
            type_response = generate_response(network_data, type_instruction, MODEL_NAME_TYPE)
            type_responses.append(type_response)
        else:
            type_responses.append(None)
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    detection_stats, type_stats = analyze_responses(detection_responses, type_responses, df['label'].values, df['attack_cat'].values)
    results = {
        'detection_stats': detection_stats,
        'type_stats': type_stats,
        'responses': {
            'detection': detection_responses,
            'type': type_responses
        }
    }
    with open(os.path.join(results_dir, f'results_{timestamp}.json'), 'w') as f:
        json.dump(results, f, indent=2)
    visualize_results(detection_stats, type_stats, timestamp)
    print("\nTest Results Summary:")
    print(f"Total samples: {detection_stats['total']}")
    print(f"Attacks detected: {detection_stats['attack_detected']}")
    print(f"True Positives: {detection_stats['true_positives']}")
    print(f"False Positives: {detection_stats['false_positives']}")
    print(f"True Negatives: {detection_stats['true_negatives']}")
    print(f"False Negatives: {detection_stats['false_negatives']}")
    # Add detection accuracy
    correct = detection_stats['true_positives'] + detection_stats['true_negatives']
    total = detection_stats['total']
    if total > 0:
        accuracy = (correct / total) * 100
        print(f"Attack Detection Accuracy: {accuracy:.2f}%")
    print("\nAttack Type Classification:")
    print(f"Correct categorizations: {type_stats['correct_categorizations']}")
    print(f"Incorrect categorizations: {type_stats['incorrect_categorizations']}")
    if type_stats['correct_categorizations'] + type_stats['incorrect_categorizations'] > 0:
        acc = (type_stats['correct_categorizations'] / (type_stats['correct_categorizations'] + type_stats['incorrect_categorizations'])) * 100
        print(f"Classification accuracy: {acc:.2f}%")
    print("\nAttack Type Distribution:")
    for attack_type, count in type_stats['attack_types'].items():
        print(f"{attack_type}: {count}")
    print("\nCategorization Matrix:")
    for true_type, predictions in type_stats['categorization_matrix'].items():
        print(f"\nTrue type: {true_type}")
        for pred_type, count in predictions.items():
            print(f"  Predicted as {pred_type}: {count}")

if __name__ == "__main__":
    main()


Test Results Summary:
Total samples: 27000
Attacks detected: 11123
True Positives: 9486
False Positives: 1637
True Negatives: 10549
False Negatives: 5328
Attack Detection Accuracy: 74.20%

Attack Type Classification:
Correct categorizations: 0
Incorrect categorizations: 9486
Classification accuracy: 0.00%

Attack Type Distribution:
This network traffic shows characteristics of a DoS attack: 1326
This network traffic shows characteristics of a Generic attack: 5806
This network traffic shows characteristics of a Exploits attack: 1180
This network traffic shows characteristics of a Fuzzers attack: 609
This network traffic shows characteristics of a Reconnaissance attack: 354
This network traffic shows characteristics of a Backdoor attack: 95
This network traffic shows characteristics of a Analysis attack: 112
This network traffic shows characteristics of a Generic Routing Encapsulation (GRE) protocol being used: 1
This network traffic shows characteristics of a Generic Routing Encapsulat

In [None]:
# Path to your results JSON file
json_path = "test_results_local/results_20250511_151814.json"
results_dir = os.path.dirname(json_path)
base_filename = os.path.splitext(os.path.basename(json_path))[0]

with open(json_path, "r") as f:
    results = json.load(f)

detection_stats = results['detection_stats']
type_stats = results['type_stats']

# 1. Confusion Matrix for Detection
plt.figure(figsize=(6, 5))
conf_matrix = np.array([
    [detection_stats['true_negatives'], detection_stats['false_positives']],
    [detection_stats['false_negatives'], detection_stats['true_positives']]
])
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues',
            xticklabels=['Normal', 'Attack'],
            yticklabels=['Normal', 'Attack'])
plt.title('Attack Detection Confusion Matrix')
plt.tight_layout()
plt.savefig(os.path.join(results_dir, f'{base_filename}_confusion_matrix.png'))
plt.close()

# 2. Attack Type Distribution
plt.figure(figsize=(8, 5))
attack_types = list(type_stats['attack_types'].keys())
counts = list(type_stats['attack_types'].values())
plt.bar(attack_types, counts)
plt.title('Attack Type Distribution')
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
plt.savefig(os.path.join(results_dir, f'{base_filename}_type_distribution.png'))
plt.close()

# 3. Attack Type Categorization Accuracy
plt.figure(figsize=(5, 5))
correct = type_stats.get('correct_categorizations', 0)
incorrect = type_stats.get('incorrect_categorizations', 0)
plt.bar(['Correct', 'Incorrect'], [correct, incorrect], color=['green', 'red'])
plt.title('Attack Type Categorization Accuracy')
plt.tight_layout()
plt.savefig(os.path.join(results_dir, f'{base_filename}_type_accuracy.png'))
plt.close()

# 4. Categorization Matrix (if available)
if 'categorization_matrix' in type_stats and type_stats['categorization_matrix']:
    matrix = pd.DataFrame(type_stats['categorization_matrix']).fillna(0).astype(int)
    plt.figure(figsize=(8, 6))
    sns.heatmap(matrix, annot=True, fmt='d', cmap='YlGnBu')
    plt.title('Attack Type Categorization Matrix\n(True label vs Predicted)')
    plt.ylabel('Predicted')
    plt.xlabel('True')
    plt.tight_layout()
    plt.savefig(os.path.join(results_dir, f'{base_filename}_categorization_matrix.png'))
    plt.close()
else:
    print("No categorization matrix found in the results.")

print("All plots saved to:", results_dir)

All plots saved to: test_results_local


  plt.tight_layout()


In [25]:
type_stats['attack_types'].keys()

dict_keys(['This network traffic shows characteristics of a DoS attack', 'This network traffic shows characteristics of a Generic attack', 'This network traffic shows characteristics of a Exploits attack', 'This network traffic shows characteristics of a Fuzzers attack', 'This network traffic shows characteristics of a Reconnaissance attack', 'This network traffic shows characteristics of a Backdoor attack', 'This network traffic shows characteristics of a Analysis attack', 'This network traffic shows characteristics of a Generic Routing Encapsulation (GRE) protocol being used', 'This network traffic shows characteristics of a Generic Routing Encapsulation (GRE) attack'])