<a href="https://colab.research.google.com/github/maphangasinalo14-cmd/ShadowLog_Siem.ipynb/blob/main/Copy_of_Cloud_Sentry_CSPM.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# =====================================================================
# CLOUDSENTRY - PRODUCTION-READY (COLAB COMPATIBLE)
# Cloud Security Posture Management (CSPM) Tool
# =====================================================================

import sys
import subprocess
import time
import signal
import atexit
from pathlib import Path
from typing import Optional

class CloudSentryLauncher:
    """Manages CloudSentry application lifecycle with Cloud Tunneling"""

    def __init__(self):
        self.streamlit_process: Optional[subprocess.Popen] = None

    def install_dependencies(self):
        """Install required packages"""
        print("üîß Installing dependencies...")
        # Added pyngrok for Cloud Access
        packages = ["pandas", "plotly", "streamlit", "pyngrok"]

        try:
            subprocess.check_call(
                [sys.executable, "-m", "pip", "install", "-q"] + packages,
                stdout=subprocess.DEVNULL,
                stderr=subprocess.PIPE
            )
            print("‚úÖ Dependencies installed successfully")
        except subprocess.CalledProcessError as e:
            print(f"‚ùå Failed to install dependencies: {e}")
            sys.exit(1)

    def write_security_engine(self):
        """Write the security auditing engine"""
        security_engine_code = '''"""
CloudSentry Security Engine
Performs automated security audits on cloud infrastructure
"""

from dataclasses import dataclass, field
from typing import List, Dict, Any
from enum import Enum

class Severity(Enum):
    """Security finding severity levels"""
    CRITICAL = "CRITICAL"
    HIGH = "HIGH"
    MEDIUM = "MEDIUM"
    LOW = "LOW"
    INFO = "INFO"

@dataclass
class Finding:
    """Represents a security finding"""
    resource_id: str
    resource_type: str
    severity: Severity
    description: str
    remediation: str
    compliance_frameworks: List[str] = field(default_factory=list)

    def to_dict(self) -> Dict[str, Any]:
        """Convert finding to dictionary"""
        return {
            'resource_id': self.resource_id,
            'resource_type': self.resource_type,
            'severity': self.severity.value,
            'description': self.description,
            'remediation': self.remediation,
            'compliance_frameworks': ', '.join(self.compliance_frameworks)
        }

class CloudSecurityAuditor:
    """Automated cloud security auditor"""

    def __init__(self, data: Dict[str, Any]):
        self.data = data
        self.findings: List[Finding] = []
        self.resources_scanned = 0

    def check_security_groups(self) -> None:
        """Audit Security Groups for risky configurations"""
        security_groups = self.data.get('security_groups', [])

        for sg in security_groups:
            self.resources_scanned += 1
            sg_id = sg.get('id', 'unknown')
            sg_name = sg.get('name', 'unknown')

            for rule in sg.get('rules', []):
                source = rule.get('source', '')
                port = rule.get('port', 0)

                # Check for publicly exposed sensitive ports
                if source == "0.0.0.0/0":
                    if port == 22:
                        self.findings.append(Finding(
                            resource_id=sg_id,
                            resource_type="Security Group",
                            severity=Severity.CRITICAL,
                            description=f"SSH (port 22) exposed to internet in '{sg_name}'",
                            remediation=f"aws ec2 revoke-security-group-ingress --group-id {sg_id} --protocol tcp --port 22 --cidr 0.0.0.0/0",
                            compliance_frameworks=["CIS AWS", "PCI-DSS", "NIST"]
                        ))
                    elif port == 3389:
                        self.findings.append(Finding(
                            resource_id=sg_id,
                            resource_type="Security Group",
                            severity=Severity.CRITICAL,
                            description=f"RDP (port 3389) exposed to internet in '{sg_name}'",
                            remediation=f"aws ec2 revoke-security-group-ingress --group-id {sg_id} --protocol tcp --port 3389 --cidr 0.0.0.0/0",
                            compliance_frameworks=["CIS AWS", "PCI-DSS"]
                        ))
                    elif port == 3306:
                        self.findings.append(Finding(
                            resource_id=sg_id,
                            resource_type="Security Group",
                            severity=Severity.CRITICAL,
                            description=f"MySQL (port 3306) exposed to internet in '{sg_name}'",
                            remediation=f"aws ec2 revoke-security-group-ingress --group-id {sg_id} --protocol tcp --port 3306 --cidr 0.0.0.0/0",
                            compliance_frameworks=["CIS AWS", "PCI-DSS"]
                        ))
                    elif port == 5432:
                        self.findings.append(Finding(
                            resource_id=sg_id,
                            resource_type="Security Group",
                            severity=Severity.CRITICAL,
                            description=f"PostgreSQL (port 5432) exposed to internet in '{sg_name}'",
                            remediation=f"aws ec2 revoke-security-group-ingress --group-id {sg_id} --protocol tcp --port 5432 --cidr 0.0.0.0/0",
                            compliance_frameworks=["CIS AWS", "PCI-DSS"]
                        ))

    def check_s3_encryption(self) -> None:
        """Audit S3 Buckets for encryption and public access"""
        s3_buckets = self.data.get('s3_buckets', [])

        for bucket in s3_buckets:
            self.resources_scanned += 1
            bucket_name = bucket.get('name', 'unknown')
            encryption = bucket.get('encryption', 'None')
            public_access = bucket.get('public_access', False)

            # Check encryption
            if encryption == "None":
                self.findings.append(Finding(
                    resource_id=bucket_name,
                    resource_type="S3 Bucket",
                    severity=Severity.HIGH,
                    description=f"S3 bucket '{bucket_name}' lacks encryption at rest",
                    remediation=f"aws s3api put-bucket-encryption --bucket {bucket_name} --server-side-encryption-configuration AES256",
                    compliance_frameworks=["CIS AWS", "PCI-DSS", "HIPAA", "GDPR"]
                ))

            # Check public access
            if public_access:
                self.findings.append(Finding(
                    resource_id=bucket_name,
                    resource_type="S3 Bucket",
                    severity=Severity.CRITICAL,
                    description=f"S3 bucket '{bucket_name}' allows public access",
                    remediation=f"aws s3api put-public-access-block --bucket {bucket_name} --public-access-block-configuration BlockPublicAcls=true,IgnorePublicAcls=true,BlockPublicPolicy=true,RestrictPublicBuckets=true",
                    compliance_frameworks=["CIS AWS", "PCI-DSS", "HIPAA", "GDPR"]
                ))

    def check_iam_mfa(self) -> None:
        """Audit IAM Users for MFA and password policies"""
        iam_users = self.data.get('iam_users', [])

        for user in iam_users:
            self.resources_scanned += 1
            username = user.get('name', 'unknown')
            mfa_enabled = user.get('mfa_enabled', False)
            has_console_access = user.get('console_access', False)

            # Check MFA for console users
            if has_console_access and not mfa_enabled:
                self.findings.append(Finding(
                    resource_id=username,
                    resource_type="IAM User",
                    severity=Severity.HIGH,
                    description=f"IAM user '{username}' has console access without MFA",
                    remediation=f"aws iam enable-mfa-device --user-name {username} --serial-number arn:aws:iam::ACCOUNT-ID:mfa/{username} --authentication-code-1 CODE1 --authentication-code-2 CODE2",
                    compliance_frameworks=["CIS AWS", "PCI-DSS", "NIST"]
                ))

    def check_ec2_instances(self) -> None:
        """Audit EC2 instances for security best practices"""
        ec2_instances = self.data.get('ec2_instances', [])

        for instance in ec2_instances:
            self.resources_scanned += 1
            instance_id = instance.get('id', 'unknown')
            monitoring = instance.get('monitoring_enabled', False)
            imdsv2 = instance.get('imdsv2_required', False)

            # Check monitoring
            if not monitoring:
                self.findings.append(Finding(
                    resource_id=instance_id,
                    resource_type="EC2 Instance",
                    severity=Severity.MEDIUM,
                    description=f"EC2 instance '{instance_id}' lacks detailed monitoring",
                    remediation=f"aws ec2 monitor-instances --instance-ids {instance_id}",
                    compliance_frameworks=["CIS AWS"]
                ))

            # Check IMDSv2
            if not imdsv2:
                self.findings.append(Finding(
                    resource_id=instance_id,
                    resource_type="EC2 Instance",
                    severity=Severity.MEDIUM,
                    description=f"EC2 instance '{instance_id}' not enforcing IMDSv2",
                    remediation=f"aws ec2 modify-instance-metadata-options --instance-id {instance_id} --http-tokens required --http-put-response-hop-limit 1",
                    compliance_frameworks=["CIS AWS"]
                ))

    def run_audit(self) -> List[Finding]:
        """Execute all security checks"""
        self.findings = []
        self.resources_scanned = 0

        print("üîç Running security audit...")
        self.check_security_groups()
        self.check_s3_encryption()
        self.check_iam_mfa()
        self.check_ec2_instances()

        print(f"‚úÖ Audit complete: {self.resources_scanned} resources scanned")
        print(f"‚ö†Ô∏è  Found {len(self.findings)} security findings")

        return self.findings
'''

        try:
            Path("security_engine.py").write_text(security_engine_code)
            print("‚úÖ Security engine created")
        except IOError as e:
            print(f"‚ùå Failed to write security engine: {e}")
            sys.exit(1)

    def write_dashboard_app(self):
        """Write the Streamlit dashboard application"""
        dashboard_code = '''"""
CloudSentry Dashboard
Interactive web interface for cloud security auditing
"""

import streamlit as st
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from security_engine import CloudSecurityAuditor, Severity

# Mock AWS Infrastructure Data
MOCK_AWS_DATA = {
    "security_groups": [
        {
            "id": "sg-0a1b2c3d",
            "name": "web-tier-sg",
            "rules": [
                {"protocol": "tcp", "port": 443, "source": "0.0.0.0/0"},
                {"protocol": "tcp", "port": 80, "source": "0.0.0.0/0"}
            ]
        },
        {
            "id": "sg-4e5f6g7h",
            "name": "admin-access-sg",
            "rules": [
                {"protocol": "tcp", "port": 22, "source": "0.0.0.0/0"},
                {"protocol": "tcp", "port": 3389, "source": "0.0.0.0/0"}
            ]
        },
        {
            "id": "sg-8i9j0k1l",
            "name": "database-sg",
            "rules": [
                {"protocol": "tcp", "port": 3306, "source": "0.0.0.0/0"},
                {"protocol": "tcp", "port": 5432, "source": "0.0.0.0/0"}
            ]
        }
    ],
    "s3_buckets": [
        {"name": "company-financial-records", "encryption": "None", "public_access": True},
        {"name": "employee-pii-data", "encryption": "None", "public_access": False},
        {"name": "public-website-assets", "encryption": "AES256", "public_access": True},
        {"name": "secure-backups", "encryption": "KMS", "public_access": False}
    ],
    "iam_users": [
        {"name": "admin_john", "mfa_enabled": False, "console_access": True},
        {"name": "developer_sarah", "mfa_enabled": False, "console_access": True},
        {"name": "audit_bot", "mfa_enabled": True, "console_access": False},
        {"name": "readonly_viewer", "mfa_enabled": True, "console_access": True}
    ],
    "ec2_instances": [
        {"id": "i-0abc123def456", "monitoring_enabled": False, "imdsv2_required": False},
        {"id": "i-0xyz789ghi012", "monitoring_enabled": True, "imdsv2_required": False},
        {"id": "i-0mno345pqr678", "monitoring_enabled": True, "imdsv2_required": True}
    ]
}

def calculate_security_score(findings_df):
    """Calculate overall security score"""
    if findings_df.empty:
        return 100

    severity_weights = {
        'CRITICAL': 20,
        'HIGH': 10,
        'MEDIUM': 5,
        'LOW': 2,
        'INFO': 1
    }

    total_deduction = sum(
        severity_weights.get(row['severity'], 0)
        for _, row in findings_df.iterrows()
    )

    return max(0, 100 - total_deduction)

def render_severity_badge(severity):
    """Render colored severity badge"""
    colors = {
        'CRITICAL': '#ff4b4b',
        'HIGH': '#ff8c00',
        'MEDIUM': '#ffa500',
        'LOW': '#90ee90',
        'INFO': '#87ceeb'
    }
    color = colors.get(severity, '#cccccc')
    return f'<span style="background-color: {color}; color: white; padding: 4px 12px; border-radius: 12px; font-weight: bold; font-size: 0.85em;">{severity}</span>'

# Page Configuration
st.set_page_config(
    page_title="CloudSentry CSPM",
    page_icon="‚òÅÔ∏è",
    layout="wide",
    initial_sidebar_state="expanded"
)

# Custom CSS
st.markdown("""
<style>
    .main-header {
        font-size: 3rem;
        font-weight: 700;
        background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
        -webkit-background-clip: text;
        -webkit-text-fill-color: transparent;
        margin-bottom: 0;
    }
    .stMetric {
        background-color: #f8f9fa;
        padding: 15px;
        border-radius: 10px;
        border-left: 4px solid #667eea;
    }
</style>
""", unsafe_allow_html=True)

# Header
st.markdown('<h1 class="main-header">‚òÅÔ∏è CloudSentry</h1>', unsafe_allow_html=True)
st.caption("üõ°Ô∏è Automated Cloud Security Posture Management (CSPM) Platform")
st.divider()

# Sidebar
with st.sidebar:
    st.header("‚öôÔ∏è Configuration")

    scan_option = st.radio(
        "Scan Scope",
        ["Full Infrastructure", "Security Groups Only", "Storage Only", "IAM Only"],
        index=0
    )

    compliance_filter = st.multiselect(
        "Compliance Frameworks",
        ["CIS AWS", "PCI-DSS", "HIPAA", "GDPR", "NIST", "SOC2"],
        default=["CIS AWS", "PCI-DSS"]
    )

    st.divider()

    if st.button("üîç Run Security Audit", type="primary", use_container_width=True):
        st.session_state.run_audit = True

    st.divider()
    st.caption("Built with Streamlit ‚Ä¢ CloudSentry v2.0")

# Initialize audit
if 'run_audit' not in st.session_state:
    st.session_state.run_audit = True

if st.session_state.run_audit:
    with st.spinner("Running comprehensive security audit..."):
        auditor = CloudSecurityAuditor(MOCK_AWS_DATA)
        findings = auditor.run_audit()

        # Convert to DataFrame
        findings_dicts = [f.to_dict() for f in findings]
        df = pd.DataFrame(findings_dicts)

        # Calculate metrics
        total_resources = auditor.resources_scanned
        total_findings = len(findings)
        security_score = calculate_security_score(df)

        critical_count = len(df[df['severity'] == 'CRITICAL']) if not df.empty else 0
        high_count = len(df[df['severity'] == 'HIGH']) if not df.empty else 0

# Metrics Dashboard
col1, col2, col3, col4 = st.columns(4)

with col1:
    st.metric(
        label="Resources Scanned",
        value=total_resources,
        delta=None
    )

with col2:
    st.metric(
        label="Security Score",
        value=f"{security_score}/100",
        delta=f"{security_score - 75}%" if security_score < 75 else "Good",
        delta_color="normal" if security_score >= 75 else "inverse"
    )

with col3:
    st.metric(
        label="Critical Risks",
        value=critical_count,
        delta=f"-{critical_count}" if critical_count > 0 else "None",
        delta_color="inverse" if critical_count > 0 else "off"
    )

with col4:
    st.metric(
        label="High Risks",
        value=high_count,
        delta=f"-{high_count}" if high_count > 0 else "None",
        delta_color="inverse" if high_count > 0 else "off"
    )

st.divider()

# Main Content
if not df.empty:
    # Create tabs
    tab1, tab2, tab3 = st.tabs(["üìä Overview", "üö® Findings", "üìã Remediation Plan"])

    with tab1:
        col_left, col_right = st.columns(2)

        with col_left:
            st.subheader("Severity Distribution")
            severity_counts = df['severity'].value_counts()
            fig_pie = px.pie(
                values=severity_counts.values,
                names=severity_counts.index,
                title='Findings by Severity',
                color=severity_counts.index,
                color_discrete_map={
                    'CRITICAL': '#ff4b4b',
                    'HIGH': '#ff8c00',
                    'MEDIUM': '#ffa500',
                    'LOW': '#90ee90',
                    'INFO': '#87ceeb'
                }
            )
            fig_pie.update_traces(textposition='inside', textinfo='percent+label')
            st.plotly_chart(fig_pie, use_container_width=True)

        with col_right:
            st.subheader("Resource Type Analysis")
            resource_counts = df['resource_type'].value_counts()
            fig_bar = px.bar(
                x=resource_counts.index,
                y=resource_counts.values,
                title='Findings by Resource Type',
                labels={'x': 'Resource Type', 'y': 'Number of Findings'},
                color=resource_counts.values,
                color_continuous_scale='Reds'
            )
            fig_bar.update_layout(showlegend=False)
            st.plotly_chart(fig_bar, use_container_width=True)

    with tab2:
        st.subheader("üö® Security Findings")

        # Severity filter
        severity_filter = st.multiselect(
            "Filter by Severity",
            options=df['severity'].unique(),
            default=df['severity'].unique()
        )

        filtered_df = df[df['severity'].isin(severity_filter)]

        # Display findings
        for idx, row in filtered_df.iterrows():
            with st.expander(
                f"{render_severity_badge(row['severity'])} {row['description']}",
                expanded=(row['severity'] == 'CRITICAL')
            ):
                col_a, col_b = st.columns([2, 1])

                with col_a:
                    st.markdown(f"**Resource:** `{row['resource_id']}`")
                    st.markdown(f"**Type:** {row['resource_type']}")
                    st.markdown(f"**Compliance:** {row['compliance_frameworks']}")

                with col_b:
                    st.markdown("**Risk Level**")
                    st.markdown(render_severity_badge(row['severity']), unsafe_allow_html=True)

                st.markdown("---")
                st.markdown("**üîß Remediation Command:**")
                st.code(row['remediation'], language="bash")

    with tab3:
        st.subheader("üìã Automated Remediation Plan")

        st.info("The following script will remediate all critical and high severity findings. Review carefully before execution.")

        remediation_script = "#!/bin/bash\\n# CloudSentry Auto-Remediation Script\\n# Generated: " + pd.Timestamp.now().strftime("%Y-%m-%d %H:%M:%S") + "\\n\\n"

        critical_high = df[df['severity'].isin(['CRITICAL', 'HIGH'])]

        for idx, row in critical_high.iterrows():
            remediation_script += f"# {row['severity']}: {row['description']}\\n"
            remediation_script += f"{row['remediation']}\\n\\n"

        st.code(remediation_script, language="bash")

        st.download_button(
            label="‚¨áÔ∏è Download Remediation Script",
            data=remediation_script,
            file_name="cloudsentry_remediation.sh",
            mime="text/plain"
        )

else:
    st.success("‚úÖ **Excellent!** No security vulnerabilities detected.")
    st.balloons()

    st.markdown("""
    ### üéâ Your cloud infrastructure is secure!

    All scanned resources meet security best practices:
    - No publicly exposed sensitive ports
    - Encryption enabled on all storage
    - MFA enforced for all users
    - Monitoring enabled on compute resources
    """)

# Footer
st.divider()
st.caption("‚ö° Powered by CloudSentry ‚Ä¢ Stay Secure, Stay Compliant")
'''

        try:
            Path("dashboard_app.py").write_text(dashboard_code)
            print("‚úÖ Dashboard application created")
        except IOError as e:
            print(f"‚ùå Failed to write dashboard: {e}")
            sys.exit(1)

    def cleanup(self):
        """Clean up resources"""
        if self.streamlit_process:
            print("\nüõë Shutting down CloudSentry...")
            self.streamlit_process.terminate()
            try:
                self.streamlit_process.wait(timeout=5)
            except subprocess.TimeoutExpired:
                self.streamlit_process.kill()
            print("‚úÖ Cleanup complete")

    def start_streamlit(self):
        """Launch Streamlit application with Tunnel"""
        print("üöÄ Starting CloudSentry Dashboard...")

        # Kill any existing ngrok processes
        try:
            from pyngrok import ngrok
            ngrok.kill()
        except:
            pass

        try:
            # 1. Start Streamlit headless
            self.streamlit_process = subprocess.Popen(
                [sys.executable, "-m", "streamlit", "run", "dashboard_app.py",
                 "--server.headless=true",
                 "--server.port=8501",
                 "--browser.gatherUsageStats=false"],
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE
            )

            time.sleep(3)

            # 2. Open Tunnel
            from pyngrok import ngrok
            # You can uncomment and add token if you have one:
            # ngrok.set_auth_token("YOUR_TOKEN_HERE")
            public_url = ngrok.connect(8501).public_url

            print("\n" + "="*60)
            print("‚úÖ CloudSentry is running!")
            print(f"üîó CLICK THIS LINK: {public_url}")
            print("="*60)
            print("\nüí° Press Ctrl+C to stop the server\n")

        except Exception as e:
            print(f"‚ùå Failed to start Streamlit: {e}")
            sys.exit(1)

    def run(self):
        """Main execution flow"""
        atexit.register(self.cleanup)
        signal.signal(signal.SIGINT, lambda s, f: sys.exit(0))

        self.install_dependencies()
        self.write_security_engine()
        self.write_dashboard_app()
        self.start_streamlit()

        try:
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            print("\nüëã Goodbye!")
            sys.exit(0)

if __name__ == "__main__":
    launcher = CloudSentryLauncher()
    launcher.run()

üîß Installing dependencies...
‚úÖ Dependencies installed successfully
‚úÖ Security engine created
‚úÖ Dashboard application created
üöÄ Starting CloudSentry Dashboard...

‚úÖ CloudSentry is running!
üîó CLICK THIS LINK: https://uninvestigable-roxane-scablike.ngrok-free.dev

üí° Press Ctrl+C to stop the server

