<a href="https://colab.research.google.com/github/maphangasinalo14-cmd/ShadowLog_Siem.ipynb/blob/main/ShadowLog_Siem.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# =====================================================================
# SHADOWLOG - DEBUG MODE (VERBOSE)
# =====================================================================

import sys
import subprocess
import time
import signal
import atexit
from pathlib import Path
from typing import Optional

class ShadowLogLauncher:
    def __init__(self):
        self.process: Optional[subprocess.Popen] = None

    def install_dependencies(self):
        print("üîß Installing dependencies (this may take 1 minute)...")
        # CHANGED: Removed output suppression so you can see it working
        packages = ["pandas", "plotly", "streamlit", "scikit-learn", "faker", "numpy", "pyngrok"]
        try:
            subprocess.check_call([sys.executable, "-m", "pip", "install"] + packages)
            print("‚úÖ Dependencies installed")
        except subprocess.CalledProcessError as e:
            print(f"‚ùå Installation failed: {e}")
            sys.exit(1)

    def write_files(self):
        print("üìù Writing application files...")

        # 1. LOG GENERATOR
        log_gen = '''import random, json
from datetime import datetime, timedelta
from faker import Faker
from pathlib import Path

fake = Faker()

ATTACKS = {
    "sql": ["/api?id=1' OR '1'='1", "/login?u=admin'--", "/q?sql=UNION SELECT"],
    "xss": ["/msg?t=<script>alert(1)</script>", "/bio?v=<img src=x onerror=alert(1)>"],
    "path": ["/file?p=../../etc/passwd", "/view?f=../../../root/.ssh/id_rsa"],
    "brute": ["/admin/login", "/wp-login.php", "/ssh"]
}

NORMAL = ["/", "/home", "/about", "/api/data", "/img/logo.png", "/products"]

def generate_logs(n=5000):
    logs, attackers = [], set()

    # Normal traffic
    for _ in range(int(n*0.70)):
        ip, ts = fake.ipv4(), fake.date_time_between('-2d').strftime('%d/%b/%Y:%H:%M:%S +0000')
        path, status = random.choice(NORMAL), random.choice([200,200,200,404])
        logs.append(f'{ip} - - [{ts}] "GET {path} HTTP/1.1" {status} {random.randint(300,5000)} "-" "Mozilla/5.0"')

    # Brute Force
    attacker = f"203.0.113.55"
    attackers.add(attacker)
    base = datetime.now()-timedelta(hours=2)
    for i in range(500):
        ts = (base+timedelta(seconds=i)).strftime('%d/%b/%Y:%H:%M:%S +0000')
        logs.append(f'{attacker} - - [{ts}] "POST {random.choice(ATTACKS["brute"])} HTTP/1.1" 401 120 "-" "Mozilla/5.0"')

    # SQL Injection
    for i in range(5):
        ip = f"198.51.100.{100+i}"
        attackers.add(ip)
        for _ in range(50):
            ts = fake.date_time_between('-2d').strftime('%d/%b/%Y:%H:%M:%S +0000')
            logs.append(f'{ip} - - [{ts}] "GET {random.choice(ATTACKS["sql"])} HTTP/1.1" 200 450 "-" "sqlmap/1.5"')

    random.shuffle(logs)
    Path("server_access.log").write_text("\\n".join(logs))
    json.dump({"total": len(logs), "attack_ips": list(attackers)}, open("metadata.json", "w"))
    print(f"‚úÖ Generated {len(logs)} logs")

if __name__ == "__main__":
    generate_logs()
'''

        # 2. ANALYZER
        analyzer = '''import re, pandas as pd, numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
import warnings
warnings.filterwarnings('ignore')

class LogAnalyzer:
    PATTERN = re.compile(r'(?P<ip>\\d+\\.\\d+\\.\\d+\\.\\d+) - - \\[(?P<ts>.*?)\\] "(?P<method>\\w+) (?P<path>.*?) HTTP/1\\.1" (?P<status>\\d+) (?P<size>\\d+)')

    def __init__(self, file="server_access.log"):
        self.file = file

    def parse(self):
        data = []
        with open(self.file) as f:
            for line in f:
                m = self.PATTERN.search(line)
                if m: data.append(m.groupdict())

        df = pd.DataFrame(data)
        df['status'] = pd.to_numeric(df['status'])
        df['size'] = pd.to_numeric(df['size'])
        df['ts'] = pd.to_datetime(df['ts'], format='%d/%b/%Y:%H:%M:%S %z')

        df['sql_inj'] = df['path'].str.contains(r"(?i)(union|select|drop|'|--)", regex=True)
        df['xss'] = df['path'].str.contains(r"(?i)(<script|onerror|alert)", regex=True)
        df['path_trav'] = df['path'].str.contains(r"\\.\\./", regex=True)
        df['has_attack'] = df['sql_inj'] | df['xss'] | df['path_trav']

        return df

    def analyze(self):
        raw = self.parse()
        features = raw.groupby('ip').agg(
            requests=('path','count'),
            unique_paths=('path','nunique'),
            error_rate=('status', lambda x: (x>=400).mean()),
            avg_size=('size','mean'),
            req_per_min=('ts', lambda x: len(x)/max(1,(x.max()-x.min()).total_seconds()/60)),
            has_sig=('has_attack', 'any')
        ).reset_index().fillna(0)

        X = features[['requests','unique_paths','error_rate','avg_size','req_per_min']].values
        scaler = StandardScaler()
        X_scaled = scaler.fit_transform(X)

        model = IsolationForest(contamination=0.15, random_state=42)
        features['ml_anomaly'] = model.fit_predict(X_scaled)
        features['is_threat'] = (features['ml_anomaly']==-1) | (features['has_sig']==True)

        def level(r):
            if not r['is_threat']: return 'NORMAL'
            if r['has_sig']: return 'CRITICAL'
            if r['error_rate']>0.7 and r['requests']>100: return 'CRITICAL'
            if r['req_per_min']>5: return 'HIGH'
            return 'MEDIUM'

        features['threat_level'] = features.apply(level, axis=1)
        return raw, features

def analyze_logs():
    return LogAnalyzer().analyze()
'''

        # 3. DASHBOARD
        dashboard = '''import streamlit as st
import pandas as pd
import plotly.express as px
import os, json
from log_gen import generate_logs
from analyzer import analyze_logs

st.set_page_config(page_title="ShadowLog SIEM", page_icon="üïµÔ∏è", layout="wide")

st.markdown("""<style>
div[data-testid="stMetric"] {border: 1px solid #444; padding: 10px; border-radius: 5px;}
</style>""", unsafe_allow_html=True)

st.title("üïµÔ∏è ShadowLog SIEM")
st.divider()

with st.sidebar:
    size = st.selectbox("Log Size", [1000,3000,5000], index=1)
    if st.button("üîÑ Generate Logs"):
        generate_logs(size)
        st.cache_data.clear()
        st.rerun()
    threat_filter = st.multiselect("Filter", ["CRITICAL","HIGH","MEDIUM"], ["CRITICAL","HIGH"])
    min_req = st.slider("Min Requests", 0, 500, 1)

if not os.path.exists("server_access.log"):
    generate_logs(3000)

@st.cache_data(ttl=300)
def load_data():
    return analyze_logs()

with st.spinner("Analyzing..."):
    raw, threats = load_data()

c1,c2,c3 = st.columns(3)
c1.metric("Total Events", len(raw))
c2.metric("Threats", len(threats[threats['is_threat']]))
c3.metric("Critical", len(threats[threats['threat_level']=='CRITICAL']))

st.divider()
filtered = threats[
    (threats['threat_level'].isin(threat_filter)) &
    (threats['requests'] >= min_req) &
    (threats['is_threat'])
].sort_values('requests', ascending=False)

if filtered.empty:
    st.info("No threats found")
else:
    for _, row in filtered.iterrows():
        with st.expander(f"üî¥ {row['threat_level']} - {row['ip']}"):
            st.write(f"Error Rate: {row['error_rate']:.1%}")
            st.code(raw[raw['ip']==row['ip']]['path'].head(5).tolist())
'''

        Path("log_gen.py").write_text(log_gen)
        Path("analyzer.py").write_text(analyzer)
        Path("app.py").write_text(dashboard)
        print("‚úÖ Files created")

    def run(self):
        atexit.register(lambda: self.process.kill() if self.process else None)

        self.install_dependencies()
        self.write_files()

        print("üöÄ Starting Streamlit...")
        from pyngrok import ngrok
        ngrok.kill()

        # Start Streamlit
        self.process = subprocess.Popen(
            [sys.executable, "-m", "streamlit", "run", "app.py",
             "--server.headless=true", "--server.port=8501",
             "--browser.gatherUsageStats=false"],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE # Capture errors
        )

        # DEBUG CHECK: Wait 5 seconds and see if it crashed
        time.sleep(5)
        if self.process.poll() is not None:
            print("\n‚ùå STREAMLIT CRASHED IMMEDIATELY!")
            print("Error logs:")
            print(self.process.stderr.read().decode())
            return

        print("‚úÖ Streamlit is running in background.")

        # Start Ngrok
        try:
            public_url = ngrok.connect(8501).public_url
            print("\n" + "="*60)
            print(f"‚úÖ SHADOWLOG IS LIVE: {public_url}")
            print("="*60)
        except Exception as e:
            print(f"‚ùå Ngrok Error: {e}")
            print("Note: If you didn't add an Auth Token, Ngrok might limit your connections.")

        # Keep alive
        try:
            while True: time.sleep(1)
        except KeyboardInterrupt:
            print("Stopped.")

if __name__ == "__main__":
    ShadowLogLauncher().run()

üîß Installing dependencies (this may take 1 minute)...
‚úÖ Dependencies installed
üìù Writing application files...
‚úÖ Files created
üöÄ Starting Streamlit...
‚úÖ Streamlit is running in background.

‚úÖ SHADOWLOG IS LIVE: https://uninvestigable-roxane-scablike.ngrok-free.dev
