# Malware Analysis Data Collector

This notebook implements a comprehensive malware analysis pipeline using the VirusTotal API. The goal is to:
1. Collect metadata and behavioral data for different malware families
2. Extract key behavioral indicators
3. Map behaviors to MITRE ATT&CK techniques
4. Generate detailed analysis reports

The analysis focuses on several malware families including:
- Emotet/Heodo
- Dridex
- AgentTesla
- Tinba
- TrickBot

### Import Libraries

### Install Required Packages
First, let's install the required Python packages.

In [None]:
# Standard libraries for data processing and file operations
import requests  # For making HTTP requests to VirusTotal API
import pandas as pd  # For data manipulation and analysis
import csv  # For reading/writing CSV files
import json  # For JSON data handling
import time  # For implementing delays between API requests
from typing import List, Dict  # Type hints for better code readability
from datetime import datetime  # For timestamps
import os  # For file/directory operations

print("Required libraries imported successfully!")

In [None]:
# ===== CONFIGURATION =====

# VirusTotal API configuration
API_KEY = '2117ff9ed05bbfde342deec3c7e417fa98cd4068adb477f43ac3c1d58e29431a'  # Your VirusTotal API key

# Rate limiting configuration 
# Free tier allows 4 requests/minute, so we need 15 seconds between requests
# Using 16 seconds to be safe
DELAY_BETWEEN_REQUESTS = 16  # seconds

# Set up request headers with API key
headers = {"x-apikey": API_KEY}

print("Configuration completed:")
print(f"- API Key configured: {'‚úì' if API_KEY else '‚úó'}")
print(f"- Rate limit delay: {DELAY_BETWEEN_REQUESTS} seconds")

In [None]:
# ===== HELPER FUNCTIONS =====

def read_hash_file(file_name: str, hash_col: str = "hash") -> List[Dict]:
    """
    Read malware hash values and related information from a CSV file.
    
    Args:
        file_name (str): Path to the CSV file containing hash data
        hash_col (str): Name of the column containing hash values (default: "hash")
        
    Returns:
        List[Dict]: List of dictionaries containing hash info and metadata
    """
    print(f"\nüîç Reading hash data from {file_name}...")
    
    # List to store hash info
    hash_data = []
    try:
        with open(file_name, newline='', encoding='utf-8') as fh:
            reader = csv.DictReader(fh)  # Open CSV as dictionary reader
            
            # Check if headers exist
            if reader.fieldnames is None:
                raise ValueError("CSV file appears to have no header row.")
            
            # Ensure hash column exists
            if hash_col not in reader.fieldnames:
                raise ValueError(f"Column '{hash_col}' not found in CSV header: {reader.fieldnames}")
            
            # Iterate each row and extract hash and related info
            print("Processing rows...")
            for row in reader:
                raw_hash = row.get(hash_col, "").strip().lower()
                # Skip empty or example hashes
                if raw_hash and raw_hash != "example_hash_here":
                    hash_data.append({
                        'hash': raw_hash,
                        'family': row.get('malware_family', 'Unknown'),
                        'source': row.get('source', 'Unknown')
                    })
        
        # Print number of hashes loaded
        print(f"‚úì Successfully loaded {len(hash_data)} hashes")
        return hash_data
    
    # Handle file not found error
    except FileNotFoundError:
        print(f"‚úó Error: File '{file_name}' not found!")
        return []
    # Handle other exceptions
    except Exception as e:
        print(f"‚úó Error reading file: {e}")
        return []

In [None]:
def get_file_report(hash_val: str) -> Dict:
    """
    Retrieve file report from VirusTotal API for a given hash.
    
    Args:
        hash_val (str): The hash value to lookup
        
    Returns:
        Dict: Response data with success status and report/error info
    """
    url = f"https://www.virustotal.com/api/v3/files/{hash_val}"
    try:
        print(f"  ‚Üí API Request: GET {url}")
        response = requests.get(url, headers=headers)
        
        # Successful response
        if response.status_code == 200:
            print("  ‚úì Request successful")
            return {'success': True, 'data': response.json()}
        # Hash not found on VirusTotal
        elif response.status_code == 404:
            print("  ‚úó Hash not found on VirusTotal")
            return {'success': False, 'error': 'Hash not found on VirusTotal'}
        # Other errors
        else:
            print(f"  ‚úó Request failed: HTTP {response.status_code}")
            return {'success': False, 'error': f'Error {response.status_code}'}
    except Exception as e:
        print(f"  ‚úó Request error: {str(e)}")
        return {'success': False, 'error': str(e)}

In [None]:
def get_behavior_report(hash_val: str) -> Dict:
    url = f"https://www.virustotal.com/api/v3/files/{hash_val}/behaviour_summary"
    try:
        print(f"  ‚Üí API Request: GET {url}")
        response = requests.get(url, headers=headers)
        if response.status_code == 200:
            print("  ‚úì Request successful")
            return {'success': True, 'data': response.json()}
        elif response.status_code == 404:
            print("  ‚úó No behavior data available")
            return {'success': False, 'error': 'No behavior data available'}
        else:
            print(f"  ‚úó Request failed: HTTP {response.status_code}")
            return {'success': False, 'error': f'Error {response.status_code}'}
    except Exception as e:
        print(f"  ‚úó Request error: {str(e)}")
        return {'success': False, 'error': str(e)}

def extract_behavioral_indicators(behavior_data: Dict) -> Dict:
    """
    Extract behavioral indicators from VirusTotal behavior data.
    
    Args:
        behavior_data (Dict): Behavioral data from VirusTotal API
        
    Returns:
        Dict: Structured behavioral indicators
    """
    print("  ‚Üí Extracting behavioral indicators...")

    # Initialize with empty structure FIRST
    indicators = {
        'processes_created': [],
        'files_written': [],
        'files_deleted': [],
        'registry_keys_set': [],
        'registry_keys_deleted': [],
        'dns_lookups': [],
        'ip_traffic': [],
        'http_conversations': [],
        'command_executions': [],
        'mutexes_created': [],
        'services_created': [],
        'mitre_techniques': []
    }

    # Check for invalid input - return empty indicators if invalid
    if behavior_data is None or not isinstance(behavior_data, dict):
        print("  ‚úó behavior_data is None or not a dict")
        return indicators

    if not behavior_data.get('success', False):
        print("  ‚úó No successful behavioral data")
        return indicators

    # Safe data access with proper error handling
    try:
        # Navigate the nested structure safely
        data = behavior_data.get('data', {})
        if isinstance(data, dict):
            inner_data = data.get('data', {})
            if isinstance(inner_data, dict):
                attributes = inner_data.get('attributes', {})
            else:
                attributes = {}
        else:
            attributes = {}

        # Extract each indicator type with safety checks
        if 'processes_created' in attributes:
            indicators['processes_created'] = attributes.get('processes_created', [])
        
        if 'files_written' in attributes:
            indicators['files_written'] = attributes.get('files_written', [])
        
        if 'files_deleted' in attributes:
            indicators['files_deleted'] = attributes.get('files_deleted', [])
        
        if 'registry_keys_set' in attributes:
            indicators['registry_keys_set'] = attributes.get('registry_keys_set', [])
        
        if 'registry_keys_deleted' in attributes:
            indicators['registry_keys_deleted'] = attributes.get('registry_keys_deleted', [])
        
        if 'dns_lookups' in attributes:
            dns_data = attributes.get('dns_lookups', [])
            indicators['dns_lookups'] = [
                lookup.get('hostname', '') for lookup in dns_data 
                if isinstance(lookup, dict)
            ]
        
        if 'ip_traffic' in attributes:
            ip_data = attributes.get('ip_traffic', [])
            indicators['ip_traffic'] = [
                f"{ip.get('destination_ip', '')}:{ip.get('destination_port', '')}" 
                for ip in ip_data if isinstance(ip, dict)
            ]
        
        if 'http_conversations' in attributes:
            http_data = attributes.get('http_conversations', [])
            indicators['http_conversations'] = [
                conv.get('url', '') for conv in http_data 
                if isinstance(conv, dict)
            ]
        
        if 'command_executions' in attributes:
            indicators['command_executions'] = attributes.get('command_executions', [])
        
        if 'mutexes_created' in attributes:
            indicators['mutexes_created'] = attributes.get('mutexes_created', [])
        
        if 'services_created' in attributes:
            indicators['services_created'] = attributes.get('services_created', [])
        
        if 'mitre_attack_techniques' in attributes:
            mitre_data = attributes.get('mitre_attack_techniques', [])
            indicators['mitre_techniques'] = [
                tech for tech in mitre_data if isinstance(tech, dict)
            ]

        print("  ‚úì Behavioral indicators extracted successfully")
        return indicators

    except Exception as e:
        print(f"  ‚úó Error extracting behavioral indicators: {str(e)}")
        return indicators

In [None]:
# Collect data for all malware hash samples and save results to files
def collect_sample_data(hash_data_list: List[Dict], output_dir: str = "output"):
    # Create output directories if they do not exist
    os.makedirs(output_dir, exist_ok=True)
    os.makedirs(f"{output_dir}/raw_responses", exist_ok=True)
    
    results = []
    total = len(hash_data_list)
    
    # Print start info and estimated total time
    print(f"\n{'='*60}")
    print(f"Starting data collection for {total} samples...")
    print(f"Estimated time: {(total * DELAY_BETWEEN_REQUESTS) / 60:.1f} minutes")
    print(f"{'='*60}\n")
    
    # Loop through hashes one by one
    for idx, hash_info in enumerate(hash_data_list, 1):
        hash_val = hash_info['hash']
        family = hash_info['family']
        
        print(f"[{idx}/{total}] Processing: {hash_val[:16]}... ({family})")
        
        # Get file report from VirusTotal
        print(f"  ‚Üí Fetching file report...")
        file_report = get_file_report(hash_val)
        
        # If file report failed, record failure and continue to next item
        if not file_report['success']:
            print(f"  ‚úó {file_report['error']}")
            results.append({
                'hash': hash_val,
                'family': family,
                'status': 'failed',
                'error': file_report['error']
            })
            time.sleep(DELAY_BETWEEN_REQUESTS)
            continue
        
        # Wait to respect rate limit
        time.sleep(DELAY_BETWEEN_REQUESTS)
        
        # Get behavioral report from VirusTotal
        print(f"  ‚Üí Fetching behavior report...")
        behavior_report = get_behavior_report(hash_val)
        
        # Save raw JSON responses to files
        with open(f"{output_dir}/raw_responses/{hash_val}_file.json", 'w') as f:
            json.dump(file_report, f, indent=2)
        with open(f"{output_dir}/raw_responses/{hash_val}_behavior.json", 'w') as f:
            json.dump(behavior_report, f, indent=2)
        
        # Extract behavioral indicators from sandbox data
        indicators = extract_behavioral_indicators(behavior_report)
        
        # Prepare result dictionary to store counts and info
        result = {
            'hash': hash_val,
            'family': family,
            'source': hash_info['source'],
            'status': 'success',
            'detection_ratio': None,
            'first_seen': None,
            'last_seen': None,
            'processes_count': len(indicators['processes_created']),
            'files_written_count': len(indicators['files_written']),
            'files_deleted_count': len(indicators['files_deleted']),
            'registry_keys_set_count': len(indicators['registry_keys_set']),
            'dns_lookups_count': len(indicators['dns_lookups']),
            'ip_connections_count': len(indicators['ip_traffic']),
            'http_requests_count': len(indicators['http_conversations']),
            'mutexes_count': len(indicators['mutexes_created']),
            'mitre_techniques_count': len(indicators['mitre_techniques']),
            'mitre_techniques': ', '.join([t.get('id', '') for t in indicators['mitre_techniques']]),
            'collected_date': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        }
        
        # Get detection stats and timestamps from file report, if available
        if file_report['success']:
            attrs = file_report['data'].get('data', {}).get('attributes', {})
            stats = attrs.get('last_analysis_stats', {})
            result['detection_ratio'] = f"{stats.get('malicious', 0)}/{sum(stats.values())}"
            result['first_seen'] = attrs.get('first_submission_date', 'N/A')
            result['last_seen'] = attrs.get('last_analysis_date', 'N/A')
        
        # Add current result to results list
        results.append(result)
        
        print(f"  ‚úì Success! Detections: {result['detection_ratio']}, "
              f"MITRE Techniques: {result['mitre_techniques_count']}")
        
        # Save progress every 5 samples
        if idx % 5 == 0:
            df_temp = pd.DataFrame(results)
            df_temp.to_csv(f"{output_dir}/analysis_results_partial.csv", index=False)
            print(f"\n  üíæ Progress saved ({idx}/{total} completed)\n")
        
        # Wait before next request (except after last one)
        if idx < total:
            print(f"  ‚è≥ Waiting {DELAY_BETWEEN_REQUESTS}s (rate limit)...\n")
            time.sleep(DELAY_BETWEEN_REQUESTS)
    
    # Save final results as CSV and JSON
    df_final = pd.DataFrame(results)
    df_final.to_csv(f"{output_dir}/analysis_results.csv", index=False)
    df_final.to_json(f"{output_dir}/analysis_results.json", orient="records", indent=2)
    
    # Summary output to console
    print(f"\n{'='*60}")
    print(f"‚úì Data collection complete!")
    print(f"  Total samples: {total}")
    print(f"  Successful: {sum(1 for r in results if r['status'] == 'success')}")
    print(f"  Failed: {sum(1 for r in results if r['status'] == 'failed')}")
    print(f"\nResults saved to:")
    print(f"  - {output_dir}/analysis_results.csv")
    print(f"  - {output_dir}/analysis_results.json")
    print(f"{'='*60}\n")
    
    return results

# Generate a summary report based on collected results
def generate_summary_report(results: List[Dict], output_dir: str = "output"):
    # Convert results list to dataframe for analysis
    df = pd.DataFrame(results)
    # Filter only successful samples
    successful = df[df['status'] == 'success']
    
    print("\n" + "="*60)
    print("COLLECTION SUMMARY REPORT")
    print("="*60)
    
    # Print basic summary counts
    print(f"\nTotal samples processed: {len(results)}")
    print(f"Successful collections: {len(successful)}")
    print(f"Failed collections: {len(results) - len(successful)}")
    
    if len(successful) > 0:
        # Show count by malware family
        print(f"\nSamples by Malware Family:")
        family_counts = successful['family'].value_counts()
        for family, count in family_counts.items():
            print(f"  {family}: {count}")
        
        # Show averages of behavioral indicators
        print(f"\nBehavioral Indicators Summary:")
        print(f"  Avg processes created: {successful['processes_count'].mean():.1f}")
        print(f"  Avg files written: {successful['files_written_count'].mean():.1f}")
        print(f"  Avg registry modifications: {successful['registry_keys_set_count'].mean():.1f}")
        print(f"  Avg DNS lookups: {successful['dns_lookups_count'].mean():.1f}")
        print(f"  Avg network connections: {successful['ip_connections_count'].mean():.1f}")
        
        # MITRE ATT&CK technique coverage info
        print(f"\nMITRE ATT&CK Coverage:")
        print(f"  Samples with MITRE techniques: {(successful['mitre_techniques_count'] > 0).sum()}")
        print(f"  Total unique techniques identified: {successful['mitre_techniques'].nunique()}")
    
    print("="*60 + "\n")


In [49]:
# ===== MAIN EXECUTION =====

print("\n" + "="*60)
print("VirusTotal Malware Analysis Pipeline")
print("="*60 + "\n")

# Define paths
INPUT_DIR = "./hash_data"
OUTPUT_DIR = "./output"

# Create output directory if it doesn't exist
if not os.path.exists(OUTPUT_DIR):
    os.makedirs(OUTPUT_DIR)
    print(f"‚úì Created output directory: {OUTPUT_DIR}")

# First, we'll convert our JSON data to a CSV format
def convert_json_to_csv():
    """Convert the JSON malware data files to a single CSV for processing"""
    print("\nüîÑ Converting JSON data to CSV format...")
    
    all_samples = []
    json_files = ['AgentTesla.json', 'Dridex.json', 'all_7_families.json']
    
    for file in json_files:
        try:
            with open(os.path.join(INPUT_DIR, file), 'r') as f:
                data = json.load(f)
                if isinstance(data, dict) and 'data' in data:
                    samples = data['data']
                else:
                    samples = data
                family = file.replace('.json', '')
                for sample in samples:
                    all_samples.append({
                        'hash': sample['sha256_hash'],
                        'malware_family': sample['signature'] or family,
                        'source': sample['reporter']
                    })
        except Exception as e:
            print(f"‚úó Error processing {file}: {str(e)}")
            continue
    
    # Save to CSV
    output_csv = os.path.join(OUTPUT_DIR, 'hash_signature_output.csv')
    df = pd.DataFrame(all_samples)
    df.to_csv(output_csv, index=False)
    print(f"‚úì Created CSV file with {len(df)} samples: {output_csv}")
    return output_csv

# Convert JSON to CSV
input_file = convert_json_to_csv()

# Read hashes to process
print("\nüìù Reading hash data...")
hash_data = read_hash_file(input_file)

# Exit if no hashes found
if not hash_data:
    print("‚úó No hashes found. Please check your input file.")
    exit(1)

# Show number of hashes and preview first 3
print(f"\nüìä Found {len(hash_data)} samples to analyze")
print("\nPreview of samples:")
for i, h in enumerate(hash_data[:3], 1):
    print(f"  {i}. {h['hash'][:16]}... ({h['family']})")
if len(hash_data) > 3:
    print(f"  ... and {len(hash_data) - 3} more")

# Ask user to confirm before starting data collection
response = input("\n‚ö†Ô∏è Proceed with data collection? (yes/no): ").strip().lower()

if response == 'yes':
    # Collect data and save to files
    print("\nüöÄ Starting analysis pipeline...")
    results = collect_sample_data(hash_data, OUTPUT_DIR)
    
    # Print summary report
    print("\nüìä Generating analysis report...")
    generate_summary_report(results, OUTPUT_DIR)
    
    print("\n‚ú® Analysis pipeline completed successfully!")
    print("üìÅ Check the output folder for detailed results.")
else:
    print("\n‚ùå Analysis cancelled by user.")

KeyboardInterrupt: 