# The Parser and Analysis Module for DNS TunViz: research on a DNS tunneling detection visualization tool ready for SOCs

In [43]:
import os
import time
import warnings
from datetime import datetime
import json
from pathlib import Path
from collections import defaultdict, Counter
from typing import Dict, List, Tuple, Any, Optional

# PCAP Analysis
from scapy.all import rdpcap, DNS, DNSQR, DNSRR

# Data Analysis
import joblib
import pandas as pd
import numpy as np
import shap
from scipy.stats import entropy
from tqdm import tqdm

# Data Modeling/Processing
from sklearn.preprocessing import StandardScaler
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV
from sklearn.metrics import (
    classification_report,
    confusion_matrix,
    roc_auc_score,
    roc_curve,
    precision_recall_curve,
    f1_score
)

# Data Visualization
from IPython.display import display, HTML, clear_output
from plotly.subplots import make_subplots
import plotly.graph_objects as go
import plotly.express as px
import matplotlib.pyplot as plt
import seaborn as sns
import ipywidgets as widgets

In [2]:
# Configuration

# CIC-Bell-DNS-EXF-2021 dataset used for this research: https://www.unb.ca/cic/datasets/dns-exf-2021.html
# Citation: Samaneh Mahdavifar, Amgad Hanafy Salem, Princy Victor, Miguel Garzon, Amir H. Razavi, Natasha Hellberg, Arash Habibi Lashkari, “Lightweight Hybrid Detection of Data Exfiltration using DNS based on Machine Learning”, The 11th IEEE International Conference on Communication and Network Security (ICCNS), Dec. 3-5, 2021, Beijing Jiaotong University, Weihai, China.

BENIGN_PCAP_DIR = "CICBellEXFDNS2021/PCAP/All_benign"        # Directory containing benign PCAP files
MALICIOUS_PCAP_DIR = "CICBellEXFDNS2021/PCAP/All_malicious"  # Directory containing malicious PCAP files
OUTPUT_FILE = "dns_features.json"                            # Output file for extracted features   

warnings.filterwarnings('ignore')

# Feature extraction

Extracts features related to DNS tunneling:

1) Length of full domain
2) Longest subdomain value
3) Number of unique subdomains per apex domain
4) Shannon entropy of subdomain
5) n-gram frequency vectors of subdomains
6) Mean and variance of request size
7) RR-type freqneices: Per-domain frequencies of A/AAAA/TXT/NULL/CNAME/MX/NS/PTR records
8) Mean and variance of TTLs over responses

In [46]:
class DNSFeatureExtractor:
    def __init__(self, ngram_range: Tuple[int, int] = (2, 3), max_features: int = 100):
        self.ngram_range = ngram_range
        self.vectorizer = CountVectorizer(
            analyzer='char',
            ngram_range=ngram_range,
            max_features=max_features
        )
        self.rr_types = ['A', 'AAAA', 'TXT', 'NULL', 'CNAME', 'MX', 'NS', 'PTR']
        
    def extract_subdomains(self, domain: str) -> List[str]:
        parts = domain.rstrip('.').split('.')
        if len(parts) <= 2:
            return []
        return parts[:-2]
    
    def calculate_shannon_entropy(self, text: str) -> float:
        if not text:
            return 0.0
        counts = Counter(text)
        probabilities = [count / len(text) for count in counts.values()]
        return entropy(probabilities, base=2)
    
    def extract_features_from_pcap(self, pcap_path: str) -> List[Dict]:
        packets = rdpcap(pcap_path)
        
        domain_data = defaultdict(lambda: {
            'queries': [],
            'subdomains': [],
            'request_sizes': [],
            'rr_counts': Counter(),
            'ttls': []
        })
        
        for pkt_num, packet in enumerate(tqdm(packets, desc="  Processing packets", leave=False)):
            try:
                if not packet.haslayer(DNS):
                    continue
                    
                dns = packet[DNS]
                
                if dns.qd:
                    try:
                        if hasattr(dns.qd, 'qname'):
                            qname_raw = dns.qd.qname
                            if isinstance(qname_raw, bytes):
                                qname = qname_raw.decode('utf-8', errors='ignore')
                            else:
                                qname = str(qname_raw)
                            
                            if not qname or qname == '.':
                                continue
                            
                            parts = qname.rstrip('.').split('.')
                            if len(parts) < 2:
                                continue
                            
                            apex = '.'.join(parts[-2:])
                            
                            domain_data[apex]['queries'].append(qname)
                            domain_data[apex]['request_sizes'].append(len(packet))
                            
                            subdomains = self.extract_subdomains(qname)
                            domain_data[apex]['subdomains'].extend(subdomains)
                    except Exception as e:
                        tqdm.write(f"  Warning: Packet {pkt_num} query parsing failed: {e}")
                        continue
                
                if dns.an:
                    try:
                        an_records = dns.an if isinstance(dns.an, list) else [dns.an]
                        for rr in an_records:
                            if hasattr(rr, 'rrname'):
                                rrname_raw = rr.rrname
                                if isinstance(rrname_raw, bytes):
                                    rrname = rrname_raw.decode('utf-8', errors='ignore')
                                else:
                                    rrname = str(rrname_raw)
                                
                                parts = rrname.rstrip('.').split('.')
                                parts = [p for p in parts if p]  # Remove empty parts
                                if len(parts) < 2:
                                    continue
                                    
                                apex = '.'.join(parts[-2:])
                                rr_type = self.get_rr_type(rr.type)
                                domain_data[apex]['rr_counts'][rr_type] += 1
                                
                                if hasattr(rr, 'ttl'):
                                    domain_data[apex]['ttls'].append(rr.ttl)
                    except Exception as e:
                        tqdm.write(f"  Warning: Packet {pkt_num} answer parsing failed: {e}")
                        continue
                        
            except Exception as e:
                tqdm.write(f"  Warning: Packet {pkt_num} processing failed: {e}")
                continue
        
        return self.compute_features(domain_data)
    
    def extract_features_with_timestamps(self, pcap_path: str) -> Tuple[List[Dict], List[datetime]]:
        """Extract features and timestamps from PCAP."""
        packets = rdpcap(pcap_path)
        
        domain_data = defaultdict(lambda: {
            'queries': [], 'subdomains': [], 'request_sizes': [],
            'rr_counts': Counter(), 'ttls': [], 'timestamps': []
        })
        
        for packet in tqdm(packets, desc="Processing packets"):
            try:
                if not packet.haslayer(DNS):
                    continue
                
                dns = packet[DNS]
                timestamp = datetime.fromtimestamp(float(packet.time))
                
                if dns.qd and hasattr(dns.qd, 'qname'):
                    qname_raw = dns.qd.qname
                    qname = qname_raw.decode('utf-8', errors='ignore') if isinstance(qname_raw, bytes) else str(qname_raw)
                    
                    if qname and qname != '.':
                        parts = qname.rstrip('.').split('.')
                        if len(parts) >= 2:
                            apex = '.'.join(parts[-2:])
                            domain_data[apex]['queries'].append(qname)
                            domain_data[apex]['request_sizes'].append(len(packet))
                            domain_data[apex]['timestamps'].append(timestamp)
                            domain_data[apex]['subdomains'].extend(self.extract_subdomains(qname))
                
                if dns.an:
                    an_records = dns.an if isinstance(dns.an, list) else [dns.an]
                    for rr in an_records:
                        if hasattr(rr, 'rrname'):
                            rrname_raw = rr.rrname
                            rrname = rrname_raw.decode('utf-8', errors='ignore') if isinstance(rrname_raw, bytes) else str(rrname_raw)
                            parts = [p for p in rrname.rstrip('.').split('.') if p]
                            if len(parts) >= 2:
                                apex = '.'.join(parts[-2:])
                                domain_data[apex]['rr_counts'][self.get_rr_type(rr.type)] += 1
                                if hasattr(rr, 'ttl'):
                                    domain_data[apex]['ttls'].append(rr.ttl)
            except:
                continue
        
        features = []
        timestamps = []
        
        for apex, data in domain_data.items():
            if data['queries']:
                feature_dict = {
                    'apex_domain': apex,
                    'avg_domain_length': float(np.mean([len(q) for q in data['queries']])),
                    'max_domain_length': int(max([len(q) for q in data['queries']])),
                    'longest_subdomain': int(max([len(s) for s in data['subdomains']])) if data['subdomains'] else 0,
                    'unique_subdomain_count': int(len(set(data['subdomains']))),
                    'avg_subdomain_entropy': float(np.mean([self.calculate_shannon_entropy(s) for s in data['subdomains']])) if data['subdomains'] else 0.0,
                    'request_size_mean': float(np.mean(data['request_sizes'])) if data['request_sizes'] else 0.0,
                    'request_size_var': float(np.var(data['request_sizes'])) if data['request_sizes'] else 0.0,
                    'ttl_mean': float(np.mean(data['ttls'])) if data['ttls'] else 0.0,
                    'ttl_var': float(np.var(data['ttls'])) if data['ttls'] else 0.0,
                }
                
                total_rr = sum(data['rr_counts'].values())
                for rr_type in self.rr_types:
                    feature_dict[f'rr_freq_{rr_type}'] = float(data['rr_counts'][rr_type] / total_rr if total_rr > 0 else 0)
                
                features.append(feature_dict)
                timestamps.append(min(data['timestamps']) if data['timestamps'] else datetime.now())
        
        return features, timestamps
    
    def get_rr_type(self, type_code: int) -> str:
        type_map = {1: 'A', 28: 'AAAA', 16: 'TXT', 10: 'NULL', 
                    5: 'CNAME', 15: 'MX', 2: 'NS', 12: 'PTR'}
        return type_map.get(type_code, 'OTHER')
    
    def compute_features(self, domain_data: Dict) -> Dict:
        features = []
        
        for apex, data in domain_data.items():
            if not data['queries']:
                continue
            
            feature_dict = {
                'apex_domain': apex,
                'avg_domain_length': float(np.mean([len(q) for q in data['queries']])),
                'max_domain_length': int(max([len(q) for q in data['queries']])),
                'longest_subdomain': int(max([len(s) for s in data['subdomains']])) if data['subdomains'] else 0,
                'unique_subdomain_count': int(len(set(data['subdomains']))),
                'avg_subdomain_entropy': float(np.mean([
                    self.calculate_shannon_entropy(s) for s in data['subdomains']
                ])) if data['subdomains'] else 0.0,
                'request_size_mean': float(np.mean(data['request_sizes'])) if data['request_sizes'] else 0.0,
                'request_size_var': float(np.var(data['request_sizes'])) if data['request_sizes'] else 0.0,
                'ttl_mean': float(np.mean(data['ttls'])) if data['ttls'] else 0.0,
                'ttl_var': float(np.var(data['ttls'])) if data['ttls'] else 0.0,
            }
            
            total_rr = sum(data['rr_counts'].values())
            for rr_type in self.rr_types:
                feature_dict[f'rr_freq_{rr_type}'] = float(
                    data['rr_counts'][rr_type] / total_rr if total_rr > 0 else 0
                )
            
            features.append(feature_dict)
        
        return features
    
    def add_ngram_features(self, features: List[Dict]) -> List[Dict]:
        all_subdomains = []
        subdomain_indices = []
        
        for idx, feature in enumerate(features):
            apex = feature['apex_domain']
            subdomain_text = apex.replace('.', '')
            all_subdomains.append(subdomain_text)
            subdomain_indices.append(idx)
        
        if all_subdomains:
            ngram_matrix = self.vectorizer.fit_transform(all_subdomains).toarray()
            feature_names = self.vectorizer.get_feature_names_out()
            
            for idx, ngram_vec in zip(subdomain_indices, ngram_matrix):
                for fname, value in zip(feature_names, ngram_vec):
                    features[idx][f'ngram_{fname}'] = int(value)
        
        return features
    
    def process_directory(self, pcap_dir: str, label: str) -> List[Dict]:
        all_features = []
        
        pcap_files = list(Path(pcap_dir).glob('*.pcap*'))
        
        for pcap_file in tqdm(pcap_files, desc=f"Processing {label} PCAPs"):
            try:
                features = self.extract_features_from_pcap(str(pcap_file))
                if features:
                    for feature in features:
                        feature['label'] = label
                        feature['source_file'] = pcap_file.name
                    all_features.extend(features)
                    tqdm.write(f"  {pcap_file.name}: Extracted {len(features)} domain features")
                else:
                    tqdm.write(f"  {pcap_file.name}: No valid DNS features found")
            except Exception as e:
                import traceback
                tqdm.write(f"Error processing {pcap_file}: {e}")
                tqdm.write(traceback.format_exc())
        
        return all_features

In [4]:
extractor = DNSFeatureExtractor(ngram_range=(2, 3), max_features=50)

print("\n" + "="*60)
print("FEATURE EXTRACTION")
print("="*60)

benign_features = extractor.process_directory(BENIGN_PCAP_DIR, label='benign')
print(f"\nBenign features extracted: {len(benign_features)}")

malicious_features = extractor.process_directory(MALICIOUS_PCAP_DIR, label='malicious')
print(f"Malicious features extracted: {len(malicious_features)}")

print("\nCombining and adding n-gram features...")
all_features = benign_features + malicious_features
all_features = extractor.add_ngram_features(all_features)

print(f"Saving {len(all_features)} total features to {OUTPUT_FILE}...")
with open(OUTPUT_FILE, 'w') as f:
    json.dump(all_features, f, indent=2)

print("\n" + "="*60)
print(f"COMPLETE - Extracted {len(benign_features)} benign and {len(malicious_features)} malicious samples")
print(f"Features saved to {OUTPUT_FILE}")
print("="*60)


FEATURE EXTRACTION


Processing benign PCAPs:  17%|█▋        | 1/6 [00:38<03:14, 38.95s/it]

  benign_2.pcap: Extracted 15324 domain features


Processing benign PCAPs:  33%|███▎      | 2/6 [01:40<03:28, 52.16s/it]

  benign_1.pcap: Extracted 21629 domain features


Processing benign PCAPs:  50%|█████     | 3/6 [02:14<02:11, 43.90s/it]

  benign_heavy_3.pcap: Extracted 12120 domain features


Processing benign PCAPs:  67%|██████▋   | 4/6 [02:40<01:13, 36.87s/it]

  benign_heavy_2.pcap: Extracted 8691 domain features


Processing benign PCAPs:  83%|████████▎ | 5/6 [03:08<00:33, 33.83s/it]

  benign.pcap: Extracted 10365 domain features


Processing benign PCAPs: 100%|██████████| 6/6 [03:37<00:00, 36.23s/it]


  benign_heavy_1.pcap: Extracted 10390 domain features

Benign features extracted: 78519


Processing malicious PCAPs:   8%|▊         | 1/12 [00:34<06:20, 34.57s/it]

  heavy_exe.pcap: Extracted 16 domain features


Processing malicious PCAPs:  17%|█▋        | 2/12 [00:37<02:40, 16.03s/it]

  light_video.pcap: Extracted 8 domain features


Processing malicious PCAPs:  25%|██▌       | 3/12 [00:39<01:26,  9.63s/it]

  light_text.pcap: Extracted 9 domain features


Processing malicious PCAPs:  33%|███▎      | 4/12 [00:53<01:29, 11.23s/it]

  light_compressed.pcap: Extracted 13 domain features


Processing malicious PCAPs:  42%|████▏     | 5/12 [01:04<01:18, 11.21s/it]

  light_audio.pcap: Extracted 12 domain features


Processing malicious PCAPs:  50%|█████     | 6/12 [01:40<01:57, 19.58s/it]

  heavy_audio.pcap: Extracted 15 domain features


Processing malicious PCAPs:  58%|█████▊    | 7/12 [01:49<01:20, 16.09s/it]

  light_exe.pcap: Extracted 10 domain features


Processing malicious PCAPs:  67%|██████▋   | 8/12 [02:21<01:24, 21.24s/it]

  heavy_compressed.pcap: Extracted 17 domain features


Processing malicious PCAPs:  75%|███████▌  | 9/12 [02:21<00:44, 14.70s/it]

  light_image.pcap: Extracted 4 domain features


Processing malicious PCAPs:  83%|████████▎ | 10/12 [03:16<00:54, 27.07s/it]

  heavy_text.pcap: Extracted 20 domain features


Processing malicious PCAPs:  92%|█████████▏| 11/12 [03:58<00:31, 31.69s/it]

  heavy_image.pcap: Extracted 14 domain features


Processing malicious PCAPs: 100%|██████████| 12/12 [04:37<00:00, 23.11s/it]


  heavy_video.pcap: Extracted 15 domain features
Malicious features extracted: 153

Combining and adding n-gram features...
Saving 78672 total features to dns_features.json...

COMPLETE - Extracted 78519 benign and 153 malicious samples
Features saved to dns_features.json


# Model creation

Between SVM and Random Forests being the most accurate models for this kind of data, this research chooses to use a Random Forest due to its easier implementation and interpretability.

Below is the process used for modelling. Steps:

1) Loading the features from the json file; extracted from above
2) Data split into training and evaluation data sets; model trained and evaluated
3) Plotting visualizations for evaluation

In [21]:
class DNSTunnelingDetector:
    def __init__(self, random_state: int = 42):
        self.random_state = random_state
        self.model: RandomForestClassifier = None
        self.scaler: StandardScaler = StandardScaler()
        self.feature_names: List[str] = None
        self.label_mapping = {"benign": 0, "malicious": 1}
        
    def load_data(self, json_file_path: str) -> pd.DataFrame:
        with open(json_file_path, 'r') as f:
            data = json.load(f)
        
        df = pd.DataFrame(data)
        print(f"Loaded {len(df)} samples from {json_file_path}")
        print(f"Label distribution:\n{df['label'].value_counts()}")
        
        return df
    
    def prepare_features(self, df: pd.DataFrame) -> Tuple[np.ndarray, np.ndarray]:
        exclude_columns = ['apex_domain', 'source_file', 'label']
        feature_columns = [col for col in df.columns if col not in exclude_columns]
        
        self.feature_names = feature_columns
        
        X = df[feature_columns].values
        y = df['label'].map(self.label_mapping).values
        
        print(f"\nFeature matrix shape: {X.shape}")
        print(f"Number of features: {len(feature_columns)}")
        
        return X, y
    
    def train(
        self,
        X_train: np.ndarray,
        y_train: np.ndarray,
        optimize_hyperparameters: bool = False
    ) -> None:
        X_train_scaled = self.scaler.fit_transform(X_train)
        
        if optimize_hyperparameters:
            print("\nOptimizing hyperparameters...")
            self.model = self._optimize_hyperparameters(X_train_scaled, y_train)
        else:
            self.model = RandomForestClassifier(
                n_estimators=100,
                max_depth=20,
                min_samples_split=5,
                min_samples_leaf=2,
                max_features='sqrt',
                random_state=self.random_state,
                n_jobs=-1,
                class_weight='balanced'
            )
            self.model.fit(X_train_scaled, y_train)
        
        print("\nTraining completed!")
        print(f"Number of trees: {self.model.n_estimators}")
        print(f"Max depth: {self.model.max_depth}")

    def balance_dataset(
        self,
        df: pd.DataFrame,
        downsample_ratio: float = 0.1,
        random_state: int = None
    ) -> pd.DataFrame:
        if random_state is None:
            random_state = self.random_state
        
        benign_df = df[df['label'] == 'benign']
        malicious_df = df[df['label'] == 'malicious']
        
        print(f"\nOriginal dataset distribution:")
        print(f"  Benign: {len(benign_df)}")
        print(f"  Malicious: {len(malicious_df)}")
        print(f"  Ratio (benign:malicious): {len(benign_df)/len(malicious_df):.1f}:1")
        
        benign_sample_size = int(len(benign_df) * downsample_ratio)
        benign_downsampled = benign_df.sample(
            n=benign_sample_size,
            random_state=random_state
        )
        
        balanced_df = pd.concat([benign_downsampled, malicious_df], ignore_index=True)
        balanced_df = balanced_df.sample(frac=1, random_state=random_state).reset_index(drop=True)
        
        print(f"\nBalanced dataset distribution:")
        print(f"  Benign: {len(benign_downsampled)}")
        print(f"  Malicious: {len(malicious_df)}")
        print(f"  Ratio (benign:malicious): {len(benign_downsampled)/len(malicious_df):.1f}:1")
        print(f"  Total samples: {len(balanced_df)}")
        print(f"  Reduction: {(1 - len(balanced_df)/len(df)) * 100:.1f}%")
        
        return balanced_df
    
    def _optimize_hyperparameters(
        self,
        X_train: np.ndarray,
        y_train: np.ndarray
    ) -> RandomForestClassifier:
        param_grid = {
            'n_estimators': [50, 100, 200],
            'max_depth': [10, 20, 30, None],
            'min_samples_split': [2, 5, 10],
            'min_samples_leaf': [1, 2, 4],
            'max_features': ['sqrt', 'log2']
        }
        
        rf = RandomForestClassifier(
            random_state=self.random_state,
            n_jobs=-1,
            class_weight='balanced'
        )
        
        grid_search = GridSearchCV(
            rf,
            param_grid,
            cv=5,
            scoring='f1',
            n_jobs=-1,
            verbose=1
        )
        
        grid_search.fit(X_train, y_train)
        print(f"Best parameters: {grid_search.best_params_}")
        print(f"Best F1 score: {grid_search.best_score_:.4f}")
        
        return grid_search.best_estimator_
    
    def evaluate(
        self,
        X_test: np.ndarray,
        y_test: np.ndarray
    ) -> Dict[str, Any]:
        X_test_scaled = self.scaler.transform(X_test)
        
        y_pred = self.model.predict(X_test_scaled)
        y_pred_proba = self.model.predict_proba(X_test_scaled)[:, 1]
        
        results = {
            'classification_report': classification_report(
                y_test,
                y_pred,
                target_names=['benign', 'malicious']
            ),
            'confusion_matrix': confusion_matrix(y_test, y_pred),
            'roc_auc': roc_auc_score(y_test, y_pred_proba),
            'f1_score': f1_score(y_test, y_pred),
            'y_pred': y_pred,
            'y_pred_proba': y_pred_proba
        }
        
        print("\n" + "="*60)
        print("EVALUATION RESULTS")
        print("="*60)
        print(f"\nROC-AUC Score: {results['roc_auc']:.4f}")
        print(f"F1 Score: {results['f1_score']:.4f}")
        print("\nClassification Report:")
        print(results['classification_report'])
        print("\nConfusion Matrix:")
        print(results['confusion_matrix'])
        
        return results
    
    def cross_validate(self, X: np.ndarray, y: np.ndarray, cv: int = 5) -> None:
        X_scaled = self.scaler.fit_transform(X)
        
        scores = cross_val_score(
            self.model,
            X_scaled,
            y,
            cv=cv,
            scoring='f1',
            n_jobs=-1
        )
        
        print(f"\nCross-validation F1 scores: {scores}")
        print(f"Mean F1: {scores.mean():.4f} (+/- {scores.std() * 2:.4f})")
    
    def get_feature_importance(self, top_n: int = 20) -> pd.DataFrame:
        importance_df = pd.DataFrame({
            'feature': self.feature_names,
            'importance': self.model.feature_importances_
        }).sort_values('importance', ascending=False)
        
        print(f"\nTop {top_n} Most Important Features:")
        print(importance_df.head(top_n).to_string(index=False))
        
        return importance_df
    
    def plot_results(
        self,
        y_test: np.ndarray,
        y_pred_proba: np.ndarray,
        importance_df: pd.DataFrame,
        output_dir: str = "./outputs"
    ) -> None:
        Path(output_dir).mkdir(parents=True, exist_ok=True)

        # Calculate metrics for plotting
        fpr, tpr, _ = roc_curve(y_test, y_pred_proba)
        auc = roc_auc_score(y_test, y_pred_proba)
        precision, recall, _ = precision_recall_curve(y_test, y_pred_proba)
        top_features = importance_df.head(15)

        # Plot 1: ROC Curve (individual)
        fig1, ax1 = plt.subplots(figsize=(8, 6))
        ax1.plot(fpr, tpr, label=f'ROC Curve (AUC = {auc:.4f})')
        ax1.plot([0, 1], [0, 1], 'k--', label='Random Classifier')
        ax1.set_xlabel('False Positive Rate')
        ax1.set_ylabel('True Positive Rate')
        ax1.set_title('ROC Curve')
        ax1.legend()
        ax1.grid(True, alpha=0.3)
        plt.tight_layout()
        plt.savefig(f"{output_dir}/roc_curve.png", dpi=300, bbox_inches='tight')
        plt.close(fig1)

        # Plot 2: Precision-Recall Curve (individual)
        fig2, ax2 = plt.subplots(figsize=(8, 6))
        ax2.plot(recall, precision)
        ax2.set_xlabel('Recall')
        ax2.set_ylabel('Precision')
        ax2.set_title('Precision-Recall Curve')
        ax2.grid(True, alpha=0.3)
        plt.tight_layout()
        plt.savefig(f"{output_dir}/precision_recall_curve.png", dpi=300, bbox_inches='tight')
        plt.close(fig2)

        # Plot 3: Feature Importance (individual)
        fig3, ax3 = plt.subplots(figsize=(8, 6))
        ax3.barh(range(len(top_features)), top_features['importance'])
        ax3.set_yticks(range(len(top_features)))
        ax3.set_yticklabels(top_features['feature'])
        ax3.set_xlabel('Importance')
        ax3.set_title('Top 15 Feature Importances')
        ax3.invert_yaxis()
        plt.tight_layout()
        plt.savefig(f"{output_dir}/feature_importance.png", dpi=300, bbox_inches='tight')
        plt.close(fig3)

        # Plot 4: Prediction Distribution (individual)
        fig4, ax4 = plt.subplots(figsize=(8, 6))
        ax4.hist(y_pred_proba[y_test == 0], bins=50, alpha=0.5, label='Benign')
        ax4.hist(y_pred_proba[y_test == 1], bins=50, alpha=0.5, label='Malicious')
        ax4.set_xlabel('Predicted Probability')
        ax4.set_ylabel('Frequency')
        ax4.set_title('Prediction Distribution')
        ax4.legend()
        plt.tight_layout()
        plt.savefig(f"{output_dir}/prediction_distribution.png", dpi=300, bbox_inches='tight')
        plt.close(fig4)

        # Combined plot (all 4 plots together)
        fig, axes = plt.subplots(2, 2, figsize=(15, 12))

        # ROC Curve
        axes[0, 0].plot(fpr, tpr, label=f'ROC Curve (AUC = {auc:.4f})')
        axes[0, 0].plot([0, 1], [0, 1], 'k--', label='Random Classifier')
        axes[0, 0].set_xlabel('False Positive Rate')
        axes[0, 0].set_ylabel('True Positive Rate')
        axes[0, 0].set_title('ROC Curve')
        axes[0, 0].legend()
        axes[0, 0].grid(True, alpha=0.3)

        # Precision-Recall Curve
        axes[0, 1].plot(recall, precision)
        axes[0, 1].set_xlabel('Recall')
        axes[0, 1].set_ylabel('Precision')
        axes[0, 1].set_title('Precision-Recall Curve')
        axes[0, 1].grid(True, alpha=0.3)

        # Feature Importance
        axes[1, 0].barh(range(len(top_features)), top_features['importance'])
        axes[1, 0].set_yticks(range(len(top_features)))
        axes[1, 0].set_yticklabels(top_features['feature'])
        axes[1, 0].set_xlabel('Importance')
        axes[1, 0].set_title('Top 15 Feature Importances')
        axes[1, 0].invert_yaxis()

        # Prediction Distribution
        axes[1, 1].hist(y_pred_proba[y_test == 0], bins=50, alpha=0.5, label='Benign')
        axes[1, 1].hist(y_pred_proba[y_test == 1], bins=50, alpha=0.5, label='Malicious')
        axes[1, 1].set_xlabel('Predicted Probability')
        axes[1, 1].set_ylabel('Frequency')
        axes[1, 1].set_title('Prediction Distribution')
        axes[1, 1].legend()

        plt.tight_layout()
        plt.savefig(f"{output_dir}/dns_tunneling_results.png", dpi=300, bbox_inches='tight')
        plt.close(fig)

        print(f"\nPlots saved to {output_dir}:")
        print(f"  - Combined: dns_tunneling_results.png")
        print(f"  - Individual: roc_curve.png")
        print(f"  - Individual: precision_recall_curve.png")
        print(f"  - Individual: feature_importance.png")
        print(f"  - Individual: prediction_distribution.png")
    
    def save_model(self, model_path: str = "./outputs/dns_model.pkl") -> None:
        Path(model_path).parent.mkdir(parents=True, exist_ok=True)
        
        model_data = {
            'model': self.model,
            'scaler': self.scaler,
            'feature_names': self.feature_names,
            'label_mapping': self.label_mapping
        }
        
        joblib.dump(model_data, model_path)
        print(f"\nModel saved to {model_path}")
    
    def load_model(self, model_path: str) -> None:
        model_data = joblib.load(model_path)
        
        self.model = model_data['model']
        self.scaler = model_data['scaler']
        self.feature_names = model_data['feature_names']
        self.label_mapping = model_data['label_mapping']
        
        print(f"Model loaded from {model_path}")
    
    def predict(self, X: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
        X_scaled = self.scaler.transform(X)
        predictions = self.model.predict(X_scaled)
        probabilities = self.model.predict_proba(X_scaled)[:, 1]
        
        return predictions, probabilities

In [None]:
detector = DNSTunnelingDetector(random_state=42)

df = detector.load_data(OUTPUT_FILE)

df = detector.balance_dataset(df, downsample_ratio=0.1)

X, y = detector.prepare_features(df)

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

print(f"\nTraining set size: {len(X_train)}")
print(f"Test set size: {len(X_test)}")

detector.train(X_train, y_train, optimize_hyperparameters=False)

results = detector.evaluate(X_test, y_test)

importance_df = detector.get_feature_importance(top_n=20)

detector.plot_results(y_test, results['y_pred_proba'], importance_df)

detector.save_model()

print("\n" + "="*60)
print("Training pipeline completed successfully!")
print("="*60)

Loaded 78672 samples from dns_features.json
Label distribution:
label
benign       78519
malicious      153
Name: count, dtype: int64

Original dataset distribution:
  Benign: 78519
  Malicious: 153
  Ratio (benign:malicious): 513.2:1

Balanced dataset distribution:
  Benign: 7851
  Malicious: 153
  Ratio (benign:malicious): 51.3:1
  Total samples: 8004
  Reduction: 89.8%

Feature matrix shape: (8004, 67)
Number of features: 67

Training set size: 6403
Test set size: 1601

Training completed!
Number of trees: 100
Max depth: 20

EVALUATION RESULTS

ROC-AUC Score: 0.9984
F1 Score: 0.9032

Classification Report:
              precision    recall  f1-score   support

      benign       1.00      1.00      1.00      1570
   malicious       0.90      0.90      0.90        31

    accuracy                           1.00      1601
   macro avg       0.95      0.95      0.95      1601
weighted avg       1.00      1.00      1.00      1601


Confusion Matrix:
[[1567    3]
 [   3   28]]

Top 20 Mo

# TunViz

TunViz is a Dashboard which can do the following tasks:

- Intake a new PCAP file and obtain the DNS queries for feature extraction and analysis
- Analyze all relevant DNS queries using a previously trained Random Forest Classifier model to flag for benign or malicious DNS queries
- Create a table for each and every query, allowing the user to filter the data down by time of the query and by type of data present in each field
- Allow users to click on each entry in the table and display a SHAP Force Plot that intuitively allows someone with little machine learning or statistical background to understand why the model made the binary decision it made for that particular query
- For all queries in the filtered-down table, create a parallel sets (alluvial) plot that shows how for each entry the features extracted and the final decision. The categories will be based on the features and the categories will be based on the threshold in the random forest that made that specific tree vote the way it did. For example, for the trees that evaluate the avg_domain_length for their decision, find the threshold that makes it vote "benign" or "malicious" and route the query through the corresponding category in the parallel sets (alluvial) plot.

In [None]:
class DNSDashboard:
    def __init__(self, model_path: str = "./outputs/dns_model.pkl"):
        """Initialize the DNS Dashboard with model and feature extractor."""
        self.model_data = None
        self.model = None
        self.scaler = None
        self.feature_names = None
        
        # Load model
        self.load_model(model_path)
        
        # Initialize feature extractor
        self.extractor = DNSFeatureExtractor()
        
        # Data storage
        self.current_data = None
        self.filtered_data = None
        self.shap_explainer = None
        self.shap_values = None
        
    def load_model(self, model_path: str) -> None:
        """Load the trained model."""
        self.model_data = joblib.load(model_path)
        self.model = self.model_data['model']
        self.scaler = self.model_data['scaler']
        self.feature_names = self.model_data['feature_names']
        print(f"Model loaded from {model_path}")
    
    def analyze_pcap(self, pcap_path: str) -> pd.DataFrame:
        """Analyze a PCAP file and return results."""
        print("Extracting DNS features...")
        features, timestamps = self.extractor.extract_features_with_timestamps(pcap_path)
        
        if not features:
            print("No DNS queries found")
            return pd.DataFrame()
        
        print(f"Found {len(features)} DNS queries")
        
        # Create DataFrame
        df = pd.DataFrame(features)
        df['timestamp'] = timestamps
        
        # Add n-gram features
        print("Adding n-gram features...")
        df = self.add_ngram_features_to_df(df)
        
        # Make predictions
        print("Running predictions...")
        X = df[self.feature_names].values
        X_scaled = self.scaler.transform(X)
        
        predictions = self.model.predict(X_scaled)
        probabilities = self.model.predict_proba(X_scaled)
        
        df['prediction'] = ['Malicious' if p == 1 else 'Benign' for p in predictions]
        df['malicious_prob'] = probabilities[:, 1]
        
        # Calculate SHAP values
        print("Computing SHAP explanations...")
        self.compute_shap_values(X_scaled)
        
        self.current_data = df
        self.filtered_data = df.copy()
        
        print("Analysis complete!")
        return df
    
    def add_ngram_features_to_df(self, df: pd.DataFrame) -> pd.DataFrame:
        """Add n-gram features to DataFrame."""
        for feature in self.feature_names:
            if feature.startswith('ngram_') and feature not in df.columns:
                df[feature] = 0
        
        all_features = df.to_dict('records')
        enhanced_features = self.extractor.add_ngram_features(all_features)
        
        for i, feature_dict in enumerate(enhanced_features):
            for key, value in feature_dict.items():
                if key.startswith('ngram_'):
                    df.at[i, key] = value
        
        return df
    
    def compute_shap_values(self, X_scaled: np.ndarray) -> None:
        """Compute SHAP values."""
        self.shap_explainer = shap.TreeExplainer(self.model)
        self.shap_values = self.shap_explainer.shap_values(X_scaled)
        
        if isinstance(self.shap_values, list):
            self.shap_values = self.shap_values[1]
    
    def create_interactive_table(self, max_rows: int = 20) -> widgets.VBox:
        """Create an interactive table with SHAP explanations."""
        if self.filtered_data is None:
            return widgets.VBox([widgets.HTML("No data available")])
        
        # Summary stats
        total = len(self.filtered_data)
        malicious = len(self.filtered_data[self.filtered_data['prediction'] == 'Malicious'])
        benign = total - malicious
        
        stats_html = f"""
        <div style="background: #f0f0f0; padding: 15px; border-radius: 8px; margin-bottom: 20px;">
            <h3>Summary Statistics</h3>
            <div style="display: grid; grid-template-columns: repeat(3, 1fr); gap: 20px;">
                <div>
                    <strong>Total Queries:</strong> {total}
                </div>
                <div>
                    <strong>Malicious:</strong> <span style="color: red;">{malicious} ({malicious/total*100:.1f}%)</span>
                </div>
                <div>
                    <strong>Benign:</strong> <span style="color: green;">{benign} ({benign/total*100:.1f}%)</span>
                </div>
            </div>
        </div>
        """
        
        # Create table
        display_cols = ['timestamp', 'apex_domain', 'prediction', 'malicious_prob']
        table_data = self.filtered_data[display_cols].head(max_rows).copy()
        table_data['malicious_prob'] = table_data['malicious_prob'].round(3)
        
        table_html = table_data.to_html(classes='table', index=True)
        
        # Create output for SHAP plots
        shap_output = widgets.Output()
        
        # Index selector
        index_selector = widgets.IntSlider(
            value=0,
            min=0,
            max=min(len(self.filtered_data)-1, max_rows-1),
            description='Query Index:',
            continuous_update=False
        )
        
        def update_shap(change):
            with shap_output:
                clear_output(wait=True)
                self.display_shap_force_plot(self.filtered_data.index[change.new])
        
        index_selector.observe(update_shap, 'value')
        
        # Initial SHAP plot
        with shap_output:
            self.display_shap_force_plot(self.filtered_data.index[0])
        
        return widgets.VBox([
            widgets.HTML(stats_html),
            widgets.HTML("<h4>DNS Queries Table</h4>"),
            widgets.HTML(table_html),
            widgets.HTML("<h4>SHAP Explanation (adjust slider to select query)</h4>"),
            index_selector,
            shap_output
        ])
    
    def display_shap_force_plot(self, index: int) -> None:
        """Display SHAP force plot for a specific query."""
        original_idx = self.current_data.index.get_loc(index)
        shap_vals = self.shap_values[original_idx]
        # Handle 2D SHAP values (binary classification)
        if len(shap_vals.shape) > 1 and shap_vals.shape[1] == 2:
            shap_vals = shap_vals[:, 1]  # Use malicious class SHAP values
        feature_vals = self.current_data.loc[index, self.feature_names].values
        
        # Get top features
        abs_shap = np.abs(shap_vals)
        sorted_idx = np.argsort(abs_shap)[::-1][:10]
        sorted_idx = list(sorted_idx.flat)
        
        # Use numpy indexing to extract arrays, then convert to lists
        shap_values_sorted = shap_vals[sorted_idx].ravel().tolist()
        feature_values_sorted = feature_vals[sorted_idx].ravel().tolist()
        
        # Extract feature names one by one
        feature_names_sorted = []
        for i in range(len(sorted_idx)):
            idx_val = sorted_idx[i] 
            feature_names_sorted.append(str(self.feature_names[idx_val]))
        
        labels = [f"{name} = {val:.2f}" for name, val in zip(feature_names_sorted, feature_values_sorted)]
        colors = ['red' if val > 0 else 'green' for val in shap_values_sorted]
        
        # Create plot
        fig = go.Figure()
        
        fig.add_trace(go.Bar(
            x=shap_values_sorted,
            y=labels,
            orientation='h',
            marker_color=colors,
            text=[f"{val:.3f}" for val in shap_values_sorted],
            textposition='outside'
        ))
        
        fig.update_layout(
            title=f"SHAP Explanation - {self.current_data.loc[index, 'apex_domain']}<br>"
                  f"Prediction: {self.current_data.loc[index, 'prediction']} "
                  f"(Prob: {self.current_data.loc[index, 'malicious_prob']:.3f})",
            xaxis_title="Feature Impact",
            yaxis_title="Features",
            height=400,
            showlegend=False
        )
        
        fig.add_vline(x=0, line_dash="dash", line_color="gray")
        fig.show()
    
    def create_parallel_sets(self) -> go.Figure:
        """Create parallel sets visualization."""
        if self.filtered_data is None or len(self.filtered_data) == 0:
            return go.Figure()
        
        # Sample data if too large
        sample_data = self.filtered_data.sample(min(100, len(self.filtered_data)))
        
        # Categorize features
        categories = []
        
        try:
            # Domain length - use duplicates='drop' to handle edge cases
            length_bins = pd.qcut(sample_data['avg_domain_length'], q=3, labels=['Short', 'Medium', 'Long'], duplicates='drop')
            categories.append({'label': 'Domain Length', 'values': length_bins.astype(str).values})
        except ValueError:
            # If qcut fails, use simple cut or direct categorization
            median_length = sample_data['avg_domain_length'].median()
            length_bins = pd.cut(sample_data['avg_domain_length'], 
                                bins=[-np.inf, median_length*0.8, median_length*1.2, np.inf], 
                                labels=['Short', 'Medium', 'Long'])
            categories.append({'label': 'Domain Length', 'values': length_bins.astype(str).values})
        
        try:
            # Entropy - use duplicates='drop' to handle edge cases
            entropy_bins = pd.qcut(sample_data['avg_subdomain_entropy'], q=3, labels=['Low', 'Medium', 'High'], duplicates='drop')
            categories.append({'label': 'Entropy', 'values': entropy_bins.astype(str).values})
        except ValueError:
            # If qcut fails, use simple categorization
            median_entropy = sample_data['avg_subdomain_entropy'].median()
            entropy_bins = pd.cut(sample_data['avg_subdomain_entropy'], 
                                 bins=[-np.inf, median_entropy*0.8, median_entropy*1.2, np.inf], 
                                 labels=['Low', 'Medium', 'High'])
            categories.append({'label': 'Entropy', 'values': entropy_bins.astype(str).values})
        
        # Subdomains
        subdomain_bins = pd.cut(sample_data['unique_subdomain_count'], 
                                bins=[-1, 0, 5, float('inf')], 
                                labels=['None', 'Few', 'Many'])
        categories.append({'label': 'Subdomains', 'values': subdomain_bins.astype(str).values})
        
        # Prediction
        categories.append({'label': 'Prediction', 'values': sample_data['prediction'].values})
        
        # Create plot
        fig = go.Figure(data=[go.Parcats(
            dimensions=categories,
            line={'color': sample_data['malicious_prob'], 
                  'colorscale': 'RdYlGn_r',
                  'showscale': True,
                  'colorbar': {'title': 'Malicious<br>Probability'}},
            hoveron='color',
            hoverinfo='count+probability'
        )])
        
        fig.update_layout(
            title="Feature Flow to Prediction (Parallel Sets)",
            height=500
        )
        
        return fig


class DNSFeatureExtractor2:
    def __init__(self):
        """Initialize the feature extractor."""
        self.rr_types = ['A', 'AAAA', 'TXT', 'NULL', 'CNAME', 'MX', 'NS', 'PTR']
        self.ngram_features = [
            'ngram_ac', 'ngram_al', 'ngram_am', 'ngram_an', 'ngram_ar', 'ngram_as', 'ngram_at',
            'ngram_ch', 'ngram_co', 'ngram_com', 'ngram_de', 'ngram_ec', 'ngram_eco', 'ngram_ed',
            'ngram_el', 'ngram_en', 'ngram_er', 'ngram_es', 'ngram_et', 'ngram_ic', 'ngram_in',
            'ngram_it', 'ngram_la', 'ngram_le', 'ngram_li', 'ngram_ma', 'ngram_me', 'ngram_nc',
            'ngram_ne', 'ngram_net', 'ngram_ng', 'ngram_nt', 'ngram_om', 'ngram_on', 'ngram_or',
            'ngram_ra', 'ngram_rc', 'ngram_re', 'ngram_ri', 'ngram_ro', 'ngram_ru', 'ngram_sc',
            'ngram_sco', 'ngram_se', 'ngram_st', 'ngram_ta', 'ngram_tc', 'ngram_te', 'ngram_ti', 'ngram_to'
        ]
    
    def extract_subdomains(self, domain: str) -> List[str]:
        parts = domain.rstrip('.').split('.')
        return parts[:-2] if len(parts) > 2 else []
    
    def calculate_shannon_entropy(self, text: str) -> float:
        if not text:
            return 0.0
        counts = Counter(text)
        probabilities = [count / len(text) for count in counts.values()]
        return entropy(probabilities, base=2)
    
    def get_rr_type(self, type_code: int) -> str:
        type_map = {1: 'A', 28: 'AAAA', 16: 'TXT', 10: 'NULL', 
                    5: 'CNAME', 15: 'MX', 2: 'NS', 12: 'PTR'}
        return type_map.get(type_code, 'OTHER')
    
    def extract_features_with_timestamps(self, pcap_path: str) -> Tuple[List[Dict], List[datetime]]:
        """Extract features and timestamps from PCAP."""
        packets = rdpcap(pcap_path)
        
        domain_data = defaultdict(lambda: {
            'queries': [], 'subdomains': [], 'request_sizes': [],
            'rr_counts': Counter(), 'ttls': [], 'timestamps': []
        })
        
        for packet in tqdm(packets, desc="Processing packets"):
            try:
                if not packet.haslayer(DNS):
                    continue
                
                dns = packet[DNS]
                timestamp = datetime.fromtimestamp(float(packet.time))
                
                if dns.qd and hasattr(dns.qd, 'qname'):
                    qname_raw = dns.qd.qname
                    qname = qname_raw.decode('utf-8', errors='ignore') if isinstance(qname_raw, bytes) else str(qname_raw)
                    
                    if qname and qname != '.':
                        parts = qname.rstrip('.').split('.')
                        if len(parts) >= 2:
                            apex = '.'.join(parts[-2:])
                            domain_data[apex]['queries'].append(qname)
                            domain_data[apex]['request_sizes'].append(len(packet))
                            domain_data[apex]['timestamps'].append(timestamp)
                            domain_data[apex]['subdomains'].extend(self.extract_subdomains(qname))
                
                if dns.an:
                    an_records = dns.an if isinstance(dns.an, list) else [dns.an]
                    for rr in an_records:
                        if hasattr(rr, 'rrname'):
                            rrname_raw = rr.rrname
                            rrname = rrname_raw.decode('utf-8', errors='ignore') if isinstance(rrname_raw, bytes) else str(rrname_raw)
                            parts = [p for p in rrname.rstrip('.').split('.') if p]
                            if len(parts) >= 2:
                                apex = '.'.join(parts[-2:])
                                domain_data[apex]['rr_counts'][self.get_rr_type(rr.type)] += 1
                                if hasattr(rr, 'ttl'):
                                    domain_data[apex]['ttls'].append(rr.ttl)
            except:
                continue
        
        features = []
        timestamps = []
        
        for apex, data in domain_data.items():
            if data['queries']:
                feature_dict = {
                    'apex_domain': apex,
                    'avg_domain_length': float(np.mean([len(q) for q in data['queries']])),
                    'max_domain_length': int(max([len(q) for q in data['queries']])),
                    'longest_subdomain': int(max([len(s) for s in data['subdomains']])) if data['subdomains'] else 0,
                    'unique_subdomain_count': int(len(set(data['subdomains']))),
                    'avg_subdomain_entropy': float(np.mean([self.calculate_shannon_entropy(s) for s in data['subdomains']])) if data['subdomains'] else 0.0,
                    'request_size_mean': float(np.mean(data['request_sizes'])) if data['request_sizes'] else 0.0,
                    'request_size_var': float(np.var(data['request_sizes'])) if data['request_sizes'] else 0.0,
                    'ttl_mean': float(np.mean(data['ttls'])) if data['ttls'] else 0.0,
                    'ttl_var': float(np.var(data['ttls'])) if data['ttls'] else 0.0,
                }
                
                total_rr = sum(data['rr_counts'].values())
                for rr_type in self.rr_types:
                    feature_dict[f'rr_freq_{rr_type}'] = float(data['rr_counts'][rr_type] / total_rr if total_rr > 0 else 0)
                
                features.append(feature_dict)
                timestamps.append(min(data['timestamps']) if data['timestamps'] else datetime.now())
        
        return features, timestamps
    
    def add_ngram_features(self, features: List[Dict]) -> List[Dict]:
        """Add n-gram features."""
        for feature in features:
            apex = feature['apex_domain']
            subdomain_text = apex.replace('.', '')
            
            ngrams = set()
            for n in [2, 3]:
                for i in range(len(subdomain_text) - n + 1):
                    ngrams.add(subdomain_text[i:i+n])
            
            for ngram_feature in self.ngram_features:
                ngram = ngram_feature.replace('ngram_', '')
                feature[ngram_feature] = 1 if ngram in ngrams else 0
        
        return features

In [None]:
print("TunViz: DNS Tunneling Detection Dashboard")
print("="*50)

# Initialize dashboard
dashboard = DNSDashboard(model_path="./outputs/dns_model.pkl")

# File upload widget
upload_widget = widgets.FileUpload(
    accept='.pcap,.pcapng',
    multiple=False,
    description='Upload PCAP:'
)

output_area = widgets.Output()

def process_upload(change):
    """Process uploaded file."""
    with output_area:
        clear_output(wait=True)
        
        if upload_widget.value:
            # Save uploaded file temporarily
            # upload_widget.value is a tuple of uploaded file info
            uploaded_file_info = upload_widget.value[0]
            temp_path = Path('/tmp/temp_dns_analysis.pcap')
            temp_path.write_bytes(uploaded_file_info['content'])
            
            print(f"Uploaded file: {uploaded_file_info['name']}")
            
            # Analyze
            df = dashboard.analyze_pcap(str(temp_path))
            
            if not df.empty:
                # Create interactive table
                table_widget = dashboard.create_interactive_table()
                display(table_widget)
                
                # Create parallel sets plot
                print("\nParallel Sets Visualization:")
                try:
                    fig = dashboard.create_parallel_sets()
                    display(fig)
                except Exception as e:
                    print(f"Note: Could not display parallel sets plot. Error: {e}")
                    print("The interactive table with SHAP explanations is still available above.")

upload_widget.observe(process_upload, names='value')

# Display UI
display(widgets.VBox([
    widgets.HTML("<h2>Step 1: Upload PCAP File</h2>"),
    upload_widget,
    widgets.HTML("<p style='color: gray;'>Upload a PCAP file to begin analysis</p>"),
    output_area
]))

TunViz: DNS Tunneling Detection Dashboard
Model loaded from ./outputs/dns_model.pkl


VBox(children=(HTML(value='<h2>Step 1: Upload PCAP File</h2>'), FileUpload(value=(), accept='.pcap,.pcapng', d…


Dashboard ready! Upload a PCAP file to start.
