In [27]:
import os
from dotenv import load_dotenv
load_dotenv()
from urllib.parse import urlparse
import requests
import logging
from typing import List, Dict
import base64

In [17]:
VIRUSTOTAL_API_KEY = os.getenv('VIRUSTOTAL_API_KEY')
SHODAN_API_KEY=os.getenv('SHODAN_API_KEY')
logger = logging.getLogger(__name__)

In [None]:
def collect_virustotal_threats(url:str) -> List[Dict]:
    try:
        url_encode=base64.urlsafe_b64encode(url.encode()).decode().strip('=')
        headers = {
            'x-apikey': VIRUSTOTAL_API_KEY
        }
        url_id = requests.get(
            f'https://www.virustotal.com/api/v3/urls/{url_encode}',
            headers=headers
        ).json()
        analysis_response = requests.get(
            f'https://www.virustotal.com/api/v3/analyses/{url_id["data"]["id"]}',
            headers=headers
        ).json()
        return {
            'malicious_count': analysis_response.get('malicious', 0),
            'suspicious_count': analysis_response.get('suspicious', 0),
            'harmless_count': analysis_response.get('harmless', 0),
            'undetected_count': analysis_response.get('undetected', 0)
        }
    
    except Exception as e:
        logger.error(f"VirusTotal Analysis Error: {e}")
        return {'error': str(e)}

In [16]:
collect_virustotal_threats("https://example.com")

aHR0cHM6Ly9leGFtcGxlLmNvbQ
{'data': {'id': '0f115db062b7c0dd030b16878c99dea5c354b49dc37b38eb8846179c7783e9d7', 'type': 'url', 'links': {'self': 'https://www.virustotal.com/api/v3/urls/0f115db062b7c0dd030b16878c99dea5c354b49dc37b38eb8846179c7783e9d7'}, 'attributes': {'last_http_response_content_sha256': 'ea8fac7c65fb589b0d53560f5251f74f9e9b243478dcb6b3ea79b5e36449c8d9', 'url': 'https://example.com/', 'redirection_chain': ['https://example.com/'], 'threat_names': [], 'targeted_brand': {'SafeToOpen': ''}, 'last_http_response_code': 200, 'total_votes': {'harmless': 13, 'malicious': 2}, 'crowdsourced_context': [{'source': 'ArcSight Threat Intelligence', 'timestamp': 1692891969, 'details': "Contextual Indicators: The domain’s Cisco Umbrella rank is 8898\n\nContextual Indicators: The URL is known benign by Check Point's Threat Cloud\n\nContextual Indicators: The domain is popular among websites with good reputation\n\nContextual Indicators: The domain is popular in the world\n\nCreated On: 19

{'malicious_count': 0,
 'suspicious_count': 0,
 'harmless_count': 0,
 'undetected_count': 0}

In [None]:
def shodan_host_analysis(domain: str) -> List[Dict]:
    try:
        host_info_response = requests.get(
            f'https://api.shodan.io/dns/resolve',
            params={
                'key': SHODAN_API_KEY,
                'hostnames': domain
            }
        )
        if host_info_response.status_code == 200:
            ip = list(host_info_response.json().values())[0]
            host_details_response = requests.get(
                f'https://api.shodan.io/shodan/host/{ip}',
                params={'key': SHODAN_API_KEY}
            )
            if host_details_response.status_code == 200:
                host_data = host_details_response.json()
                
                return {
                    'ip': ip,
                    'total_ports_open': len(host_data.get('ports', [])),
                    'vulnerable_services': [
                        service for service in host_data.get('data', []) 
                        if 'vulners' in str(service).lower()
                    ],
                    'country': host_data.get('country_name', 'Unknown'),
                    'organization': host_data.get('org', 'Unknown'),
                    'last_update': host_data.get('last_update', 'Unknown')
                }
        
        return {
            'error': 'Unable to resolve host or retrieve Shodan information'
        }
    
    except Exception as e:
        logger.error(f"Shodan Analysis Error: {e}")
        return {'error': str(e)}

In [30]:
shodan_host_analysis("google.com")

<Response [200]>


{'ip': '142.250.65.174',
 'total_ports_open': 2,
 'vulnerable_services': [],
 'country': 'United States',
 'organization': 'Google LLC',
 'last_update': '2025-03-04T17:47:37.933234'}

In [32]:
url="https://google.com"
parsed_url = urlparse(url)
domain = parsed_url.netloc
domain


'google.com'

In [26]:
def calculate_risk_score(vt_result: Dict, shodan_result: Dict) -> float:
    try:
        vt_risk = (
            (vt_result.get('malicious_count', 0) * 2 + 
                vt_result.get('suspicious_count', 0)) / 
            (vt_result.get('total_engines', 1) / 2)
        )
        shodan_risk = 0
        if shodan_result.get('vulnerable_services'):
            shodan_risk += len(shodan_result.get('vulnerable_services', [])) * 0.5
        if shodan_result.get('total_ports_open', 0) > 10:
            shodan_risk += 1
        total_risk = min(max(vt_risk + shodan_risk, 0), 10)
        return round(total_risk, 2)
    except Exception as e:
        logger.error(f"Risk Score Calculation Error: {e}")

In [34]:
url="https://google.com"
parsed_url = urlparse(url)
domain = parsed_url.netloc
vt_result=collect_virustotal_threats(url)
shodan_result=shodan_host_analysis(domain)
print(vt_result)
print(shodan_result)
calculate_risk_score(vt_result,shodan_result)

aHR0cHM6Ly9nb29nbGUuY29t
{'data': {'id': '9d116b1b0c1200ca75016e4c010bc94836366881b021a658ea7f8548b6543c1e', 'type': 'url', 'links': {'self': 'https://www.virustotal.com/api/v3/urls/9d116b1b0c1200ca75016e4c010bc94836366881b021a658ea7f8548b6543c1e'}, 'attributes': {'first_submission_date': 1308916568, 'last_analysis_date': 1741163102, 'last_http_response_content_sha256': '7b1ede2040ba733b09ebf0aeca54e6fd9760989905310cf2ea17f671c4684000', 'tld': 'com', 'last_final_url': 'https://www.google.com/', 'html_meta': {'referrer': ['origin']}, 'outgoing_links': ['https://about.google/?fg=1&utm_source=google-US&utm_medium=referral&utm_campaign=hp-header', 'https://store.google.com/US?utm_source=hp_header&utm_medium=google_ooo&utm_campaign=GS100042&hl=en-US', 'https://mail.google.com/mail/&ogbl', 'https://www.google.com/imghp?hl=en&ogbl', 'https://www.google.com/intl/en/about/products', 'https://accounts.google.com/ServiceLogin?hl=en&passive=true&continue=https://www.google.com/&ec=GAZAmgQ', 'https

0.0

In [35]:
combined_result = {
                'url': url,
                'domain': domain,
                'virustotal': vt_result,
                'shodan': shodan_result,
                'overall_risk_score': calculate_risk_score(vt_result, shodan_result)
            }

In [None]:
from mongoengine import Document, StringField, DateTimeField, ListField, FloatField
from datetime import datetime

class ThreatIntelligence(Document):
    source = StringField(required=True)
    threat_type = StringField(required=True)
    severity = FloatField(required=True)
    indicators = ListField(StringField())
    description = StringField()
    timestamp = DateTimeField(default=datetime.utcnow)
    
    meta = {
        'collection': 'threat_intelligence',
        'indexes': [
            'source',
            'threat_type',
            '-timestamp'
        ]
    }

In [40]:
def save_url_threat(threat_data: Dict):
    try:
        threat = ThreatIntelligence(
            source='URL Analysis',
            threat_type='Web Threat',
            severity=threat_data.get('overall_risk_score', 0),
            indicators=[
                threat_data.get('url', ''),
                threat_data.get('domain', '')
            ],
            description=f"VirusTotal: {threat_data.get('virustotal', {})} | " 
                        f"Shodan: {threat_data.get('shodan', {})}"
        )
        threat.save()
    except Exception as e:
        logger.error(f"Threat Saving Error: {e}")

In [41]:
save_url_threat(combined_result)

Threat Saving Error: You have not defined a default connection
