In [3]:
import os

# Look inside your "Colab Notebooks" folder
print(os.listdir("/content/drive/My Drive/Colab Notebooks"))

['1_spaCy with google mount.ipynb', '.git', '9_EnDynaDemo_ThreatAnalysis.ipynb']


In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
%cd /content/drive/My Drive/Colab Notebooks/

/content/drive/My Drive/Colab Notebooks


In [None]:
"""
Cybersecurity Threat Intelligence NLP Pipeline
Using: spaCy, Hugging Face Transformers, NVD API
"""

import re
import requests
import pandas as pd
import numpy as np
from typing import Dict, List
import spacy
from transformers import pipeline, AutoTokenizer, AutoModelForSequenceClassification
import torch

# ============================================================================
# 1. SETUP AND INITIALIZATION
# ============================================================================

# Load spaCy for NER and text processing
nlp = spacy.load("en_core_web_sm")

# Load Hugging Face models for threat classification
print("Loading Hugging Face models...")

# Zero-shot classification for threat categorization
classifier = pipeline("zero-shot-classification",
                     model="facebook/bart-large-mnli")

# Sentiment analysis for urgency detection
sentiment_analyzer = pipeline("sentiment-analysis",
                             model="distilbert-base-uncased-finetuned-sst-2-english")

print("Models loaded successfully!\n")

# ============================================================================
# 2. TEXT PREPROCESSING WITH SPACY
# ============================================================================

def preprocess_text(text: str) -> Dict:
    """
    Preprocess threat intelligence text using spaCy
    """
    doc = nlp(text)

    return {
        'original_text': text,
        'sentences': [sent.text for sent in doc.sents],
        'tokens': [token.text for token in doc if not token.is_stop and not token.is_punct],
        'entities': [(ent.text, ent.label_) for ent in doc.ents],
        'noun_chunks': [chunk.text for chunk in doc.noun_chunks]
    }

# ============================================================================
# 3. CVE AND IOC EXTRACTION
# ============================================================================

def extract_cves(text: str) -> List[str]:
    """
    Extract CVE identifiers using regex
    """
    cve_pattern = r'CVE-\d{4}-\d{4,7}'
    cves = re.findall(cve_pattern, text, re.IGNORECASE)
    return list(set([cve.upper() for cve in cves]))

def extract_iocs(text: str) -> Dict[str, List[str]]:
    """
    Extract Indicators of Compromise (IOCs)
    """
    iocs = {
        'ip_addresses': [],
        'domains': [],
        'file_hashes': [],
        'urls': []
    }

    # IP addresses
    ip_pattern = r'\b(?:\d{1,3}\.){3}\d{1,3}\b'
    iocs['ip_addresses'] = list(set(re.findall(ip_pattern, text)))

    # Domains (simplified)
    domain_pattern = r'\b(?:[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}\b'
    iocs['domains'] = list(set(re.findall(domain_pattern, text)))

    # MD5 hashes
    md5_pattern = r'\b[a-fA-F0-9]{32}\b'
    iocs['file_hashes'] = list(set(re.findall(md5_pattern, text)))

    # URLs
    url_pattern = r'https?://[^\s<>"{}|\\^`\[\]]+'
    iocs['urls'] = list(set(re.findall(url_pattern, text)))

    return iocs

# ============================================================================
# 4. THREAT CLASSIFICATION WITH HUGGING FACE
# ============================================================================

def classify_threat_type(text: str) -> Dict:
    """
    Classify threat type using Hugging Face zero-shot classification
    This is where Hugging Face transformers shine!
    """
    # Define threat categories based on MITRE ATT&CK and common threats
    threat_categories = [
        "ransomware",
        "malware",
        "phishing",
        "data breach",
        "DDoS attack",
        "vulnerability exploitation",
        "insider threat",
        "advanced persistent threat",
        "zero-day exploit"
    ]

    # Use Hugging Face transformer for classification
    result = classifier(text, threat_categories, multi_label=True)

    # Return top 3 threat types with confidence scores
    classified = {
        'primary_threat': result['labels'][0],
        'confidence': result['scores'][0],
        'all_threats': dict(zip(result['labels'][:3], result['scores'][:3]))
    }

    return classified

def analyze_urgency(text: str) -> Dict:
    """
    Analyze threat urgency using sentiment analysis
    Negative sentiment + certain keywords = high urgency
    """
    # Get sentiment (positive/negative and confidence)
    sentiment = sentiment_analyzer(text[:512])[0]  # Truncate to model limit

    # Check for urgency keywords
    urgency_keywords = [
        'critical', 'immediate', 'urgent', 'active', 'exploit',
        'actively exploited', 'in the wild', 'zero-day',
        'patch immediately', 'emergency'
    ]

    urgency_count = sum(1 for keyword in urgency_keywords if keyword.lower() in text.lower())

    # Calculate urgency score
    sentiment_score = sentiment['score'] if sentiment['label'] == 'NEGATIVE' else 1 - sentiment['score']
    keyword_score = min(urgency_count / 3, 1.0)  # Normalize to 0-1

    urgency_score = (sentiment_score * 0.4) + (keyword_score * 0.6)

    urgency_level = 'CRITICAL' if urgency_score > 0.8 else \
                   'HIGH' if urgency_score > 0.6 else \
                   'MEDIUM' if urgency_score > 0.4 else 'LOW'

    return {
        'urgency_level': urgency_level,
        'urgency_score': urgency_score,
        'sentiment': sentiment['label'],
        'sentiment_confidence': sentiment['score'],
        'urgency_keywords_found': urgency_count
    }

# ============================================================================
# 5. NVD API ENRICHMENT
# ============================================================================

def enrich_cve_with_nvd(cve_id: str) -> Dict:
    """
    Fetch CVE details from National Vulnerability Database
    """
    try:
        url = f"https://services.nvd.nist.gov/rest/json/cves/2.0?cveId={cve_id}"
        response = requests.get(url, timeout=10)

        if response.status_code == 200:
            data = response.json()

            if 'vulnerabilities' in data and len(data['vulnerabilities']) > 0:
                vuln = data['vulnerabilities'][0]['cve']

                # Extract CVSS score
                cvss_score = 'N/A'
                cvss_severity = 'N/A'
                cvss_vector = 'N/A'

                if 'metrics' in vuln:
                    if 'cvssMetricV31' in vuln['metrics'] and len(vuln['metrics']['cvssMetricV31']) > 0:
                        cvss_data = vuln['metrics']['cvssMetricV31'][0]['cvssData']
                        cvss_score = cvss_data.get('baseScore', 'N/A')
                        cvss_severity = cvss_data.get('baseSeverity', 'N/A')
                        cvss_vector = cvss_data.get('vectorString', 'N/A')

                # Extract description
                description = 'N/A'
                if 'descriptions' in vuln and len(vuln['descriptions']) > 0:
                    description = vuln['descriptions'][0]['value']

                return {
                    'cve_id': cve_id,
                    'description': description,
                    'cvss_score': cvss_score,
                    'cvss_severity': cvss_severity,
                    'cvss_vector': cvss_vector,
                    'status': 'success'
                }

        return {'cve_id': cve_id, 'status': 'not_found'}

    except Exception as e:
        return {'cve_id': cve_id, 'status': 'error', 'error': str(e)}

# ============================================================================
# 6. RISK SCORING AND PRIORITIZATION
# ============================================================================

def calculate_risk_score(cve_data: Dict, threat_classification: Dict, urgency: Dict) -> Dict:
    """
    Calculate overall risk score combining multiple factors
    This mimics your doctoral research on risk assessment frameworks
    """
    # CVSS score (0-10) - 40% weight
    cvss_score = float(cve_data.get('cvss_score', 0)) if cve_data.get('cvss_score') != 'N/A' else 5.0
    cvss_normalized = cvss_score / 10.0

    # Threat type severity - 30% weight
    threat_severity_map = {
        'ransomware': 0.95,
        'zero-day exploit': 1.0,
        'advanced persistent threat': 0.9,
        'data breach': 0.85,
        'vulnerability exploitation': 0.8,
        'malware': 0.75,
        'phishing': 0.7,
        'DDoS attack': 0.65,
        'insider threat': 0.8
    }
    threat_severity = threat_severity_map.get(threat_classification['primary_threat'], 0.5)

    # Urgency - 30% weight
    urgency_score = urgency['urgency_score']

    # Calculate weighted risk score
    risk_score = (
        cvss_normalized * 0.4 +
        threat_severity * 0.3 +
        urgency_score * 0.3
    ) * 100

    # Determine risk level
    risk_level = 'CRITICAL' if risk_score >= 85 else \
                'HIGH' if risk_score >= 70 else \
                'MEDIUM' if risk_score >= 50 else \
                'LOW'

    return {
        'risk_score': round(risk_score, 2),
        'risk_level': risk_level,
        'cvss_contribution': round(cvss_normalized * 40, 2),
        'threat_contribution': round(threat_severity * 30, 2),
        'urgency_contribution': round(urgency_score * 30, 2)
    }

# ============================================================================
# 7. COMPLETE PIPELINE INTEGRATION
# ============================================================================

def process_threat_intelligence(raw_text: str) -> Dict:
    """
    Complete end-to-end threat intelligence processing pipeline
    This is what their production system likely does!
    """
    print("\n" + "="*80)
    print("PROCESSING THREAT INTELLIGENCE")
    print("="*80 + "\n")

    # Step 1: Preprocess with spaCy
    print("Step 1: Preprocessing text with spaCy...")
    preprocessed = preprocess_text(raw_text)
    print(f"  - Extracted {len(preprocessed['sentences'])} sentences")
    print(f"  - Found {len(preprocessed['entities'])} named entities\n")

    # Step 2: Extract CVEs and IOCs
    print("Step 2: Extracting CVEs and IOCs...")
    cves = extract_cves(raw_text)
    iocs = extract_iocs(raw_text)
    print(f"  - CVEs found: {cves}")
    print(f"  - IP addresses: {iocs['ip_addresses']}")
    print(f"  - Domains: {iocs['domains']}\n")

    # Step 3: Classify threat type (Hugging Face!)
    print("Step 3: Classifying threat type with Hugging Face transformers...")
    threat_classification = classify_threat_type(raw_text)
    print(f"  - Primary threat: {threat_classification['primary_threat']}")
    print(f"  - Confidence: {threat_classification['confidence']:.2%}\n")

    # Step 4: Analyze urgency (Hugging Face!)
    print("Step 4: Analyzing urgency with Hugging Face sentiment analysis...")
    urgency = analyze_urgency(raw_text)
    print(f"  - Urgency level: {urgency['urgency_level']}")
    print(f"  - Urgency score: {urgency['urgency_score']:.2f}\n")

    # Step 5: Enrich CVEs with NVD data
    print("Step 5: Enriching CVEs with NVD data...")
    cve_enrichments = []
    for cve in cves:
        print(f"  - Querying NVD for {cve}...")
        enrichment = enrich_cve_with_nvd(cve)
        cve_enrichments.append(enrichment)
        if enrichment['status'] == 'success':
            print(f"    CVSS Score: {enrichment['cvss_score']} ({enrichment['cvss_severity']})")
    print()

    # Step 6: Calculate risk scores
    print("Step 6: Calculating risk scores...")
    risk_assessments = []
    for cve_data in cve_enrichments:
        if cve_data['status'] == 'success':
            risk = calculate_risk_score(cve_data, threat_classification, urgency)
            risk_assessments.append({
                'cve_id': cve_data['cve_id'],
                **risk
            })
            print(f"  - {cve_data['cve_id']}: Risk Score = {risk['risk_score']} ({risk['risk_level']})")

    # Compile final results
    results = {
        'original_text': raw_text,
        'preprocessed': preprocessed,
        'cves': cves,
        'iocs': iocs,
        'threat_classification': threat_classification,
        'urgency_analysis': urgency,
        'cve_details': cve_enrichments,
        'risk_assessments': risk_assessments,
        'timestamp': pd.Timestamp.now().isoformat()
    }

    return results

# ============================================================================
# 8. DEMO WITH REAL THREAT INTELLIGENCE
# ============================================================================

if __name__ == "__main__":
    # Sample threat intelligence report (similar to what they'd process)
    sample_threat_intel = """
    CRITICAL SECURITY ALERT: Active exploitation of CVE-2021-44228 (Log4Shell)

    Threat actors are actively exploiting the Apache Log4j vulnerability to deploy
    ransomware payloads. Multiple ransomware groups including Conti and REvil have
    been observed using this vulnerability for initial access.

    Technical Details:
    - Affected versions: Apache Log4j 2.0-beta9 through 2.14.1
    - Attack vector: Remote code execution via JNDI injection
    - No authentication required
    - CVSS Score: 10.0 (Critical)

    Observed malicious infrastructure:
    - C2 Server: 45.67.89.123
    - Malicious domain: evil-logging-server.com
    - Payload hash: 5d41402abc4b2a76b9719d911017c592

    IMMEDIATE ACTION REQUIRED: Organizations must patch to Log4j 2.17.0 or later
    immediately. This vulnerability is being actively exploited in the wild and
    poses critical risk to enterprise networks.

    Additional CVE-2021-45046 (related bypass) should also be addressed.
    """

    # Process the threat intelligence
    results = process_threat_intelligence(sample_threat_intel)

    # Display summary
    print("\n" + "="*80)
    print("FINAL THREAT INTELLIGENCE SUMMARY")
    print("="*80 + "\n")

    print(f"Threat Type: {results['threat_classification']['primary_threat'].upper()}")
    print(f"Urgency: {results['urgency_analysis']['urgency_level']}")
    print(f"CVEs Identified: {', '.join(results['cves'])}")
    print(f"IOCs Found: {len(results['iocs']['ip_addresses'])} IPs, "
          f"{len(results['iocs']['domains'])} domains")

    if results['risk_assessments']:
        print("\nRisk Assessment:")
        for risk in results['risk_assessments']:
            print(f"  {risk['cve_id']}: {risk['risk_level']} "
                  f"(Score: {risk['risk_score']}/100)")

    print("\n" + "="*80)
    print("PROCESSING COMPLETE")
    print("="*80)

Loading Hugging Face models...


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json: 0.00B [00:00, ?B/s]

model.safetensors:   0%|          | 0.00/1.63G [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/26.0 [00:00<?, ?B/s]

vocab.json: 0.00B [00:00, ?B/s]

merges.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

Device set to use cpu


config.json:   0%|          | 0.00/629 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/268M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

Device set to use cpu


Models loaded successfully!


PROCESSING THREAT INTELLIGENCE

Step 1: Preprocessing text with spaCy...
  - Extracted 6 sentences
  - Found 13 named entities

Step 2: Extracting CVEs and IOCs...
  - CVEs found: ['CVE-2021-44228', 'CVE-2021-45046']
  - IP addresses: ['45.67.89.123']
  - Domains: ['evil-logging-server.com']

Step 3: Classifying threat type with Hugging Face transformers...
  - Primary threat: vulnerability exploitation
  - Confidence: 99.67%

Step 4: Analyzing urgency with Hugging Face sentiment analysis...
  - Urgency level: CRITICAL
  - Urgency score: 1.00

Step 5: Enriching CVEs with NVD data...
  - Querying NVD for CVE-2021-44228...
  - Querying NVD for CVE-2021-45046...
    CVSS Score: 9.0 (CRITICAL)

Step 6: Calculating risk scores...
  - CVE-2021-45046: Risk Score = 89.96 (CRITICAL)

FINAL THREAT INTELLIGENCE SUMMARY

Threat Type: VULNERABILITY EXPLOITATION
Urgency: CRITICAL
CVEs Identified: CVE-2021-44228, CVE-2021-45046
IOCs Found: 1 IPs, 1 domains

Risk Assessme

In [5]:
!git add 9_EnDynaDemo_ThreatAnalysis.ipynb

In [7]:
!git config --global user.email "parkd2012@gmail.com"
!git config --global user.name "Dennis Park"

In [8]:
!git commit -m "9_EnDynaDemo_ThreatAnalysis.ipynb initial"

[clean-main 8eee683] 9_EnDynaDemo_ThreatAnalysis.ipynb initial
 1 file changed, 1 insertion(+)
 create mode 100644 9_EnDynaDemo_ThreatAnalysis.ipynb


In [None]:
!git push origin main

Enumerating objects: 4, done.
Counting objects:  25% (1/4)Counting objects:  50% (2/4)Counting objects:  75% (3/4)Counting objects: 100% (4/4)Counting objects: 100% (4/4), done.
Delta compression using up to 2 threads
Compressing objects:  33% (1/3)Compressing objects:  66% (2/3)Compressing objects: 100% (3/3)Compressing objects: 100% (3/3), done.
Writing objects:  33% (1/3)Writing objects:  66% (2/3)Writing objects: 100% (3/3)Writing objects: 100% (3/3), 13.20 KiB | 1.10 MiB/s, done.
Total 3 (delta 0), reused 0 (delta 0), pack-reused 0
To https://github.com/parkd2012/endyna_demo.git
   f3eb289..557d96b  main -> main


In [9]:
!git status

On branch clean-main
Your branch is ahead of 'origin/main' by 1 commit.
  (use "git push" to publish your local commits)

Changes not staged for commit:
  (use "git add <file>..." to update what will be committed)
  (use "git restore <file>..." to discard changes in working directory)
	[31mmodified:   9_EnDynaDemo_ThreatAnalysis.ipynb[m

no changes added to commit (use "git add" and/or "git commit -a")


In [10]:
!git add 9_EnDynaDemo_ThreatAnalysis.ipynb

In [11]:
!git status

On branch clean-main
Your branch is ahead of 'origin/main' by 1 commit.
  (use "git push" to publish your local commits)

Changes to be committed:
  (use "git restore --staged <file>..." to unstage)
	[32mmodified:   9_EnDynaDemo_ThreatAnalysis.ipynb[m



In [12]:
!git commit -m "Add updated 9_EnDynaDemo_ThreatAnalysis notebook"

[clean-main 6770275] Add updated 9_EnDynaDemo_ThreatAnalysis notebook
 1 file changed, 1 insertion(+), 1 deletion(-)
 rewrite 9_EnDynaDemo_ThreatAnalysis.ipynb (81%)


In [17]:
!git push -f origin clean-main:main

Enumerating objects: 9, done.
Counting objects: 100% (9/9), done.
Delta compression using up to 2 threads
Compressing objects: 100% (9/9), done.
Writing objects: 100% (9/9), 16.07 KiB | 470.00 KiB/s, done.
Total 9 (delta 3), reused 0 (delta 0), pack-reused 0
remote: Resolving deltas: 100% (3/3), done.[K
To https://github.com/parkd2012/endyna_demo.git
 + 31ea6c4...6770275 clean-main -> main (forced update)


In [15]:
!git log --oneline

[33m6770275[m[33m ([m[1;36mHEAD -> [m[1;32mclean-main[m[33m)[m Add updated 9_EnDynaDemo_ThreatAnalysis notebook
[33m8eee683[m 9_EnDynaDemo_ThreatAnalysis.ipynb initial
[33m9e671a3[m[33m ([m[1;31morigin/main[m[33m)[m Initial commit without old secrets


In [18]:
!pip install nbformat nbstripout

Collecting nbstripout
  Downloading nbstripout-0.8.1-py2.py3-none-any.whl.metadata (19 kB)
Downloading nbstripout-0.8.1-py2.py3-none-any.whl (16 kB)
Installing collected packages: nbstripout
Successfully installed nbstripout-0.8.1
