In [3]:
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import json
from pathlib import Path
import pandas as pd
from sklearn.metrics import silhouette_score, silhouette_samples
from typing import Dict, List


class MalwareBehaviorVisualizer:
    def __init__(self, data_dir: str = '/data/saranyav/gcn_new/behavioral_analysis'):
        self.data_dir = Path(data_dir)
        # Set style
        plt.style.use('seaborn')
        sns.set_palette("husl")
        
    def load_data_from_aggregator(self, aggregator):
        """Load data directly from MalwareBehaviorAggregator instance."""
        self.family_distributions = aggregator.family_distributions
        self.groups, self.similarity_matrix = aggregator.create_behavioral_groups()
        
    def load_data_from_files(self):
        """Load previously saved data from files."""
        self.similarity_matrix = np.load(self.data_dir / 'similarity_matrix.npy')
        with open(self.data_dir / 'behavioral_groups.json', 'r') as f:
            self.groups = json.load(f)
            
    def plot_similarity_heatmap(self, figsize=(12, 10)):
        """Plot similarity matrix heatmap."""
        plt.figure(figsize=figsize)
        sns.heatmap(self.similarity_matrix, 
                   cmap='viridis',
                   xticklabels=False, 
                   yticklabels=False)
        plt.title('Family Similarity Matrix')
        plt.xlabel('Family Index')
        plt.ylabel('Family Index')
        plt.show()
        
    def plot_group_sizes(self, figsize=(10, 6)):
        """Plot distribution of behavioral group sizes."""
        group_sizes = [len(families) for families in self.groups.values()]
        
        plt.figure(figsize=figsize)
        sns.histplot(group_sizes, bins=20)
        plt.title('Distribution of Behavioral Group Sizes')
        plt.xlabel('Number of Families in Group')
        plt.ylabel('Count')
        plt.show()
        
        # Box plot
        plt.figure(figsize=figsize)
        sns.boxplot(x=group_sizes)
        plt.title('Behavioral Group Size Distribution')
        plt.xlabel('Group Size')
        plt.show()
        
    def plot_silhouette_analysis(self, n_clusters_range=range(2, 11)):
        """Plot silhouette analysis for different numbers of clusters."""
        plt.figure(figsize=(10, 6))
        
        silhouette_scores = []
        for n_clusters in n_clusters_range:
            # Compute silhouette score using distance matrix (1 - similarity)
            distance_matrix = 1 - self.similarity_matrix
            from sklearn.cluster import AgglomerativeClustering
            
            clustering = AgglomerativeClustering(
                n_clusters=n_clusters,
                metric='precomputed',
                linkage='average'
            )
            labels = clustering.fit_predict(distance_matrix)
            score = silhouette_score(distance_matrix, labels, metric='precomputed')
            silhouette_scores.append(score)
        
        plt.plot(n_clusters_range, silhouette_scores, 'bo-')
        plt.title('Silhouette Score vs Number of Clusters')
        plt.xlabel('Number of Clusters')
        plt.ylabel('Silhouette Score')
        plt.grid(True)
        plt.show()
        
    def plot_feature_distributions(self, family_distributions: Dict, features: List[str] = None):
        """Plot feature distributions across families."""
        if features is None:
            features = ['mem_ops', 'calls', 'instructions', 'stack_ops']
            
        n_features = len(features)
        fig, axes = plt.subplots(n_features, 1, figsize=(12, 4*n_features))
        
        for idx, feature in enumerate(features):
            feature_data = []
            for family, dist in family_distributions.items():
                if 'feature_stats' in dist and feature in dist['feature_stats']:
                    hist = dist['feature_stats'][feature]['histogram']
                    feature_data.append(hist)
            
            if feature_data:
                # Plot average distribution
                mean_dist = np.mean(feature_data, axis=0)
                std_dist = np.std(feature_data, axis=0)
                x = range(len(mean_dist))
                
                axes[idx].plot(x, mean_dist, 'b-', label='Mean')
                axes[idx].fill_between(x, 
                                     mean_dist - std_dist,
                                     mean_dist + std_dist,
                                     alpha=0.3,
                                     label='±1 std')
                axes[idx].set_title(f'{feature} Distribution')
                axes[idx].legend()
                
        plt.tight_layout()
        plt.show()
        
    def plot_behavior_patterns(self, family_distributions: Dict):
        """Plot common behavior patterns across families."""
        patterns = ['ext_call', 'mem_rw', 'cond_jump', 'ext_write']
        
        pattern_data = []
        for family, dist in family_distributions.items():
            if 'behavior_patterns' in dist:
                family_patterns = []
                for pattern in patterns:
                    value = dist['behavior_patterns'].get(pattern, 0)
                    family_patterns.append(value)
                pattern_data.append(family_patterns)
                
        if pattern_data:
            pattern_data = np.array(pattern_data)
            
            plt.figure(figsize=(10, 6))
            sns.boxplot(data=pd.DataFrame(pattern_data, columns=patterns))
            plt.title('Distribution of Behavior Patterns')
            plt.xticks(rotation=45)
            plt.ylabel('Frequency')
            plt.show()
            
    def plot_structural_features(self, family_distributions: Dict):
        """Plot structural features across families."""
        structures = ['branching_nodes', 'merge_nodes', 'terminal_nodes',
                     'isolated_nodes', 'dense_regions']
        
        struct_data = []
        for family, dist in family_distributions.items():
            if 'local_structures' in dist:
                family_structs = []
                for struct in structures:
                    value = dist['local_structures'].get(struct, 0)
                    family_structs.append(value)
                struct_data.append(family_structs)
                
        if struct_data:
            struct_data = np.array(struct_data)
            
            plt.figure(figsize=(12, 6))
            sns.boxplot(data=pd.DataFrame(struct_data, columns=structures))
            plt.title('Distribution of Structural Features')
            plt.xticks(rotation=45)
            plt.ylabel('Frequency')
            plt.show()

# Usage example:
# visualizer = MalwareBehaviorVisualizer()
# visualizer.plot_similarity_heatmap()
# visualizer.plot_group_sizes()
# visualizer.plot_silhouette_analysis()
# visualizer.plot_feature_distributions(family_distributions)
# visualizer.plot_behavior_patterns(family_distributions)
# visualizer.plot_structural_features(family_distributions)

In [4]:
# from paste_2 import MalwareBehaviorAggregator  # Your second file
# from /data/saranyav/gcn_new/Processing/ import MalwareBehaviorAggregator
import sys
sys.path.append('/data/saranyav/gcn_new/Processing/')
from family_aggregator import MalwareBehaviorAggregator

# Initialize and load data
aggregator = MalwareBehaviorAggregator(batch_dir=Path('/data/saranyav/gcn_new/bodmas_batches_new'))
aggregator.load_processed_batches(split='train')
aggregator.process_families()

visualizer = MalwareBehaviorVisualizer()
visualizer.load_data_from_aggregator(aggregator)
visualizer.plot_similarity_heatmap()
# # Initialize visualizer with data from aggregator
# visualizer = MalwareBehaviorVisualizer()
# visualizer.load_data_from_aggregator(aggregator)

# # Create visualizations
# visualizer.plot_similarity_heatmap()
# visualizer.plot_feature_distributions(visualizer.family_distributions)

2025-01-02 18:02:32,087 - INFO - Loading batches from /data/saranyav/gcn_new/bodmas_batches_new/train
Loading batches: 100%|██████████| 352/352 [00:13<00:00, 25.94it/s]
2025-01-02 18:02:45,661 - INFO - Loaded 488 families
2025-01-02 18:02:45,662 - INFO - 
Malware type distribution:
2025-01-02 18:02:45,662 - INFO - trojan: 311 families
2025-01-02 18:02:45,662 - INFO - worm: 77 families
2025-01-02 18:02:45,663 - INFO - backdoor: 32 families
2025-01-02 18:02:45,663 - INFO - ransomware: 16 families
2025-01-02 18:02:45,663 - INFO - informationstealer: 15 families
2025-01-02 18:02:45,663 - INFO - dropper: 10 families
2025-01-02 18:02:45,663 - INFO - downloader: 10 families
2025-01-02 18:02:45,664 - INFO - virus: 7 families
2025-01-02 18:02:45,664 - INFO - pua: 5 families
2025-01-02 18:02:45,664 - INFO - cryptominer: 2 families
2025-01-02 18:02:45,664 - INFO - trojan-gamethief: 1 families
2025-01-02 18:02:45,664 - INFO - exploit: 1 families
2025-01-02 18:02:45,665 - INFO - rootkit: 1 families

In [None]:
import json
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from collections import defaultdict, Counter
from pathlib import Path
import logging

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class BehavioralGroupAnalyzer:
    """Analyze behavioral groups for insights on malware families."""

    def __init__(self, groups_file: Path, similarity_matrix_file: Path):
        self.groups_file = groups_file
        self.similarity_matrix_file = similarity_matrix_file
        self.groups = self._load_groups()
        self.similarity_matrix = self._load_similarity_matrix()

    def _load_groups(self) -> dict:
        logger.info(f"Loading behavioral groups from {self.groups_file}")
        with open(self.groups_file, 'r') as f:
            groups = json.load(f)
        logger.info(f"Loaded {len(groups)} behavioral groups.")
        return groups

    def _load_similarity_matrix(self) -> np.ndarray:
        logger.info(f"Loading similarity matrix from {self.similarity_matrix_file}")
        return np.load(self.similarity_matrix_file)

    def summarize_groups(self):
        group_sizes = {k: len(v) for k, v in self.groups.items()}
        total_samples = sum(group_sizes.values())
        logger.info(f"Total samples: {total_samples}")
        logger.info(f"Number of groups: {len(group_sizes)}")
        logger.info(f"Average group size: {np.mean(list(group_sizes.values())):.2f}")

        plt.figure(figsize=(10, 6))
        sns.histplot(list(group_sizes.values()), bins=20, kde=False)
        plt.title("Behavioral Group Size Distribution")
        plt.xlabel("Group Size")
        plt.ylabel("Count")
        plt.tight_layout()
        plt.savefig("group_size_distribution.png")
        plt.close()

    def analyze_similarity_distribution(self):
        intra_group_sims = []
        inter_group_sims = []

        for group_id, families in self.groups.items():
            indices = [i for i, f in enumerate(families) if f in families]
            for i in indices:
                for j in indices:
                    if i != j:
                        intra_group_sims.append(self.similarity_matrix[i, j])
            for other_group_id, other_families in self.groups.items():
                if other_group_id != group_id:
                    other_indices = [i for i, f in enumerate(families) if f in other_families]
                    for i in indices:
                        for j in other_indices:
                            inter_group_sims.append(self.similarity_matrix[i, j])

        plt.figure(figsize=(10, 6))
        sns.histplot(intra_group_sims, bins=50, kde=True, color='blue', label='Intra-group')
        sns.histplot(inter_group_sims, bins=50, kde=True, color='red', label='Inter-group')
        plt.legend()
        plt.title("Similarity Distributions: Intra- vs. Inter-group")
        plt.xlabel("Similarity Score")
        plt.ylabel("Frequency")
        plt.tight_layout()
        plt.savefig("similarity_distributions.png")
        plt.close()

    def analyze_family_composition(self):
        family_counter = Counter()
        for group_id, families in self.groups.items():
            family_counter.update(families)

        top_families = family_counter.most_common(10)
        logger.info("Top 10 families by occurrence:")
        for family, count in top_families:
            logger.info(f"{family}: {count}")

        families, counts = zip(*top_families)
        plt.figure(figsize=(10, 6))
        sns.barplot(x=counts, y=families, orient='h')
        plt.title("Top 10 Families by Occurrence")
        plt.xlabel("Count")
        plt.ylabel("Family")
        plt.tight_layout()
        plt.savefig("top_families.png")
        plt.close()

if __name__ == "__main__":
    analyzer = BehavioralGroupAnalyzer(
        groups_file=Path('/data/saranyav/gcn_new/behavioral_analysis/behavioral_groups.json'),
        similarity_matrix_file=Path('/data/saranyav/gcn_new/behavioral_analysis/similarity_matrix.npy')
    )

    analyzer.summarize_groups()
    analyzer.analyze_similarity_distribution()
    analyzer.analyze_family_composition()

    logger.info("Behavioral group analysis complete.")