🔍 Pipeline Overview
Parse logs: Read log data (e.g., from a .log or .csv file).

* Extract relevant fields: Timestamp, IP, message, severity, etc.

* Define threat rules: Based on IP repetition, error types, login failures, etc.

* Detect anomalies: Using heuristics or ML (if needed).

* Summarize findings: Show suspicious IPs, timestamps, messages.

In [1]:
import pandas as pd
import re
from collections import defaultdict
from datetime import datetime, timedelta

A sample python code to analyze the security log reports for below threat rules:


*   Multiple failed login attempts from same IP in a short time.
*   Access to restricted endpoints
*   Known vulnerability patterns in request URLs.

In [3]:


# Sample log format: 2025-07-10 14:23:01,IP:192.168.1.5,EVENT:Failed login for user 'admin'
LOG_PATTERN = r"(?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}),IP:(?P<ip>[\d\.]+),EVENT:(?P<event>.+)"

def parse_logs(log_file_path):
    logs = []
    with open(log_file_path, 'r') as file:
        for line in file:
            match = re.match(LOG_PATTERN, line.strip())
            if match:
                logs.append(match.groupdict())
    return pd.DataFrame(logs)

def preprocess_logs(df):
    df['timestamp'] = pd.to_datetime(df['timestamp'])
    return df

def detect_failed_logins(df, threshold=5, window_minutes=5):
    threats = []
    df_failed = df[df['event'].str.contains("Failed login", case=False)]
    grouped = df_failed.groupby('ip')
    for ip, group in grouped:
        group = group.sort_values('timestamp')
        for i in range(len(group) - threshold + 1):
            if group.iloc[i + threshold - 1]['timestamp'] - group.iloc[i]['timestamp'] <= timedelta(minutes=window_minutes):
                threats.append({
                    'ip': ip,
                    'threat_type': 'Brute-force login',
                    'start_time': group.iloc[i]['timestamp'],
                    'end_time': group.iloc[i + threshold - 1]['timestamp'],
                    'attempts': threshold
                })
                break
    return threats

def detect_restricted_access(df):
    threats = []
    restricted_patterns = ["admin", "config", "passwd", ".env"]
    for _, row in df.iterrows():
        if any(pat in row['event'].lower() for pat in restricted_patterns):
            threats.append({
                'ip': row['ip'],
                'threat_type': 'Restricted area access attempt',
                'timestamp': row['timestamp'],
                'event': row['event']
            })
    return threats

def analyze_logs(file_path):
    df = parse_logs(file_path)
    df = preprocess_logs(df)
    brute_force_threats = detect_failed_logins(df)
    restricted_access_threats = detect_restricted_access(df)
    return brute_force_threats + restricted_access_threats


threat_report = analyze_logs("/content/sample_data/security_logs.txt")
for threat in threat_report:
  print(threat)


{'ip': '192.168.1.5', 'threat_type': 'Brute-force login', 'start_time': Timestamp('2025-07-10 14:23:01'), 'end_time': Timestamp('2025-07-10 14:24:01'), 'attempts': 5}
{'ip': '192.168.1.5', 'threat_type': 'Restricted area access attempt', 'timestamp': Timestamp('2025-07-10 14:23:01'), 'event': "Failed login for user 'admin'"}
{'ip': '192.168.1.5', 'threat_type': 'Restricted area access attempt', 'timestamp': Timestamp('2025-07-10 14:23:15'), 'event': "Failed login for user 'admin'"}
{'ip': '192.168.1.5', 'threat_type': 'Restricted area access attempt', 'timestamp': Timestamp('2025-07-10 14:23:30'), 'event': "Failed login for user 'admin'"}
{'ip': '192.168.1.5', 'threat_type': 'Restricted area access attempt', 'timestamp': Timestamp('2025-07-10 14:23:45'), 'event': "Failed login for user 'admin'"}
{'ip': '192.168.1.5', 'threat_type': 'Restricted area access attempt', 'timestamp': Timestamp('2025-07-10 14:24:01'), 'event': "Failed login for user 'admin'"}
{'ip': '192.168.1.9', 'threat_typ

🔧 **Extended Plan: Add Anomaly Detection
We'll:**

* Convert IPs to numerical values.

* Extract features from log events (event type counts, time of access).

* Fit an Isolation Forest to detect abnormal patterns.

In [4]:
import pandas as pd
import re
import socket
import struct
from sklearn.ensemble import IsolationForest
from datetime import timedelta
import numpy as np

LOG_PATTERN = r"(?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}),IP:(?P<ip>[\d\.]+),EVENT:(?P<event>.+)"

def parse_logs(log_file_path):
    logs = []
    with open(log_file_path, 'r') as file:
        for line in file:
            match = re.match(LOG_PATTERN, line.strip())
            if match:
                logs.append(match.groupdict())
    return pd.DataFrame(logs)

def preprocess_logs(df):
    df['timestamp'] = pd.to_datetime(df['timestamp'])
    df['hour'] = df['timestamp'].dt.hour
    df['event_lower'] = df['event'].str.lower()
    df['event_len'] = df['event'].str.len()
    df['ip_num'] = df['ip'].apply(ip_to_int)
    return df

def ip_to_int(ip):
    try:
        return struct.unpack("!I", socket.inet_aton(ip))[0]
    except:
        return 0

def detect_failed_logins(df, threshold=5, window_minutes=5):
    threats = []
    df_failed = df[df['event_lower'].str.contains("failed login")]
    grouped = df_failed.groupby('ip')
    for ip, group in grouped:
        group = group.sort_values('timestamp')
        for i in range(len(group) - threshold + 1):
            if group.iloc[i + threshold - 1]['timestamp'] - group.iloc[i]['timestamp'] <= timedelta(minutes=window_minutes):
                threats.append({
                    'ip': ip,
                    'threat_type': 'Brute-force login',
                    'start_time': group.iloc[i]['timestamp'],
                    'end_time': group.iloc[i + threshold - 1]['timestamp'],
                    'attempts': threshold
                })
                break
    return threats

def detect_restricted_access(df):
    threats = []
    restricted_patterns = ["admin", "config", "passwd", ".env"]
    for _, row in df.iterrows():
        if any(pat in row['event_lower'] for pat in restricted_patterns):
            threats.append({
                'ip': row['ip'],
                'threat_type': 'Restricted area access attempt',
                'timestamp': row['timestamp'],
                'event': row['event']
            })
    return threats

def extract_features_for_ml(df):
    df_grouped = df.groupby('ip').agg({
        'event_len': 'mean',
        'hour': 'median',
        'ip_num': 'first',
        'event_lower': lambda x: sum(['failed' in e for e in x])  # Count of "failed" in events
    }).rename(columns={'event_lower': 'failed_count'})
    return df_grouped.reset_index()

def detect_anomalies(df_features):
    model = IsolationForest(n_estimators=100, contamination=0.1, random_state=42)
    df_features['anomaly'] = model.fit_predict(df_features[['event_len', 'hour', 'ip_num', 'failed_count']])
    anomalies = df_features[df_features['anomaly'] == -1]
    return anomalies[['ip', 'event_len', 'hour', 'failed_count']]

def analyze_logs_with_ml(file_path):
    df = parse_logs(file_path)
    df = preprocess_logs(df)

    brute_force_threats = detect_failed_logins(df)
    restricted_access_threats = detect_restricted_access(df)

    features = extract_features_for_ml(df)
    anomaly_ips = detect_anomalies(features)

    anomaly_threats = [{
        'ip': row['ip'],
        'threat_type': 'Anomalous behavior detected by ML',
        'details': {
            'mean_event_length': row['event_len'],
            'median_hour': row['hour'],
            'failed_login_events': row['failed_count']
        }
    } for _, row in anomaly_ips.iterrows()]

    return brute_force_threats + restricted_access_threats + anomaly_threats

# Example usage
threat_report = analyze_logs_with_ml("/content/sample_data/security_logs.txt")
for threat in threat_report:
    print(threat)


{'ip': '192.168.1.5', 'threat_type': 'Brute-force login', 'start_time': Timestamp('2025-07-10 14:23:01'), 'end_time': Timestamp('2025-07-10 14:24:01'), 'attempts': 5}
{'ip': '192.168.1.5', 'threat_type': 'Restricted area access attempt', 'timestamp': Timestamp('2025-07-10 14:23:01'), 'event': "Failed login for user 'admin'"}
{'ip': '192.168.1.5', 'threat_type': 'Restricted area access attempt', 'timestamp': Timestamp('2025-07-10 14:23:15'), 'event': "Failed login for user 'admin'"}
{'ip': '192.168.1.5', 'threat_type': 'Restricted area access attempt', 'timestamp': Timestamp('2025-07-10 14:23:30'), 'event': "Failed login for user 'admin'"}
{'ip': '192.168.1.5', 'threat_type': 'Restricted area access attempt', 'timestamp': Timestamp('2025-07-10 14:23:45'), 'event': "Failed login for user 'admin'"}
{'ip': '192.168.1.5', 'threat_type': 'Restricted area access attempt', 'timestamp': Timestamp('2025-07-10 14:24:01'), 'event': "Failed login for user 'admin'"}
{'ip': '192.168.1.9', 'threat_typ

In [5]:
pip install streamlit pandas scikit-learn

Collecting streamlit
  Downloading streamlit-1.46.1-py3-none-any.whl.metadata (9.0 kB)
Collecting watchdog<7,>=2.1.5 (from streamlit)
  Downloading watchdog-6.0.0-py3-none-manylinux2014_x86_64.whl.metadata (44 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.3/44.3 kB[0m [31m3.2 MB/s[0m eta [36m0:00:00[0m
Collecting pydeck<1,>=0.8.0b4 (from streamlit)
  Downloading pydeck-0.9.1-py2.py3-none-any.whl.metadata (4.1 kB)
Downloading streamlit-1.46.1-py3-none-any.whl (10.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m10.1/10.1 MB[0m [31m59.7 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading pydeck-0.9.1-py2.py3-none-any.whl (6.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.9/6.9 MB[0m [31m47.0 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading watchdog-6.0.0-py3-none-manylinux2014_x86_64.whl (79 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m79.1/79.1 kB[0m [31m7.4 MB/s[0m eta [36m0:00:00[0m
[?25hI

In [6]:
import streamlit as st
import pandas as pd
import re
import socket
import struct
from sklearn.ensemble import IsolationForest
from datetime import timedelta
import numpy as np

LOG_PATTERN = r"(?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}),IP:(?P<ip>[\d\.]+),EVENT:(?P<event>.+)"

def parse_logs(log_file_path):
    logs = []
    with open(log_file_path, 'r') as file:
        for line in file:
            match = re.match(LOG_PATTERN, line.strip())
            if match:
                logs.append(match.groupdict())
    return pd.DataFrame(logs)

def preprocess_logs(df):
    df['timestamp'] = pd.to_datetime(df['timestamp'])
    df['hour'] = df['timestamp'].dt.hour
    df['event_lower'] = df['event'].str.lower()
    df['event_len'] = df['event'].str.len()
    df['ip_num'] = df['ip'].apply(ip_to_int)
    return df

def ip_to_int(ip):
    try:
        return struct.unpack("!I", socket.inet_aton(ip))[0]
    except:
        return 0

def detect_failed_logins(df, threshold=5, window_minutes=5):
    threats = []
    df_failed = df[df['event_lower'].str.contains("failed login")]
    grouped = df_failed.groupby('ip')
    for ip, group in grouped:
        group = group.sort_values('timestamp')
        for i in range(len(group) - threshold + 1):
            if group.iloc[i + threshold - 1]['timestamp'] - group.iloc[i]['timestamp'] <= timedelta(minutes=window_minutes):
                threats.append({
                    'ip': ip,
                    'threat_type': 'Brute-force login',
                    'start_time': group.iloc[i]['timestamp'],
                    'end_time': group.iloc[i + threshold - 1]['timestamp'],
                    'attempts': threshold
                })
                break
    return threats

def detect_restricted_access(df):
    threats = []
    restricted_patterns = ["admin", "config", "passwd", ".env"]
    for _, row in df.iterrows():
        if any(pat in row['event_lower'] for pat in restricted_patterns):
            threats.append({
                'ip': row['ip'],
                'threat_type': 'Restricted area access attempt',
                'timestamp': row['timestamp'],
                'event': row['event']
            })
    return threats

def extract_features_for_ml(df):
    df_grouped = df.groupby('ip').agg({
        'event_len': 'mean',
        'hour': 'median',
        'ip_num': 'first',
        'event_lower': lambda x: sum(['failed' in e for e in x])  # Count of "failed" in events
    }).rename(columns={'event_lower': 'failed_count'})
    return df_grouped.reset_index()

def detect_anomalies(df_features):
    model = IsolationForest(n_estimators=100, contamination=0.1, random_state=42)
    df_features['anomaly'] = model.fit_predict(df_features[['event_len', 'hour', 'ip_num', 'failed_count']])
    anomalies = df_features[df_features['anomaly'] == -1]
    return anomalies[['ip', 'event_len', 'hour', 'failed_count']]

def analyze_logs_with_ml(file_path):
    df = parse_logs(file_path)
    df = preprocess_logs(df)

    brute_force_threats = detect_failed_logins(df)
    restricted_access_threats = detect_restricted_access(df)

    features = extract_features_for_ml(df)
    anomaly_ips = detect_anomalies(features)

    anomaly_threats = [{
        'ip': row['ip'],
        'threat_type': 'Anomalous behavior detected by ML',
        'details': {
            'mean_event_length': row['event_len'],
            'median_hour': row['hour'],
            'failed_login_events': row['failed_count']
        }
    } for _, row in anomaly_ips.iterrows()]

    return brute_force_threats + restricted_access_threats + anomaly_threats

# Upload and parse logs
st.title("🔒 Security Log Threat Analyzer")

uploaded_file = st.file_uploader("Upload your log file (.txt)", type=["txt"])
if uploaded_file:
    # Save uploaded file temporarily
    with open("temp_logs.txt", "wb") as f:
        f.write(uploaded_file.getbuffer())

    # Analyze logs
    with st.spinner("Analyzing logs..."):
        threats = analyze_logs_with_ml("temp_logs.txt")

    # Convert threats to DataFrame
    df_threats = pd.DataFrame(threats)

    st.success(f"Detected {len(df_threats)} potential threats")
    st.dataframe(df_threats)

    # Filter controls
    unique_types = df_threats['threat_type'].unique()
    selected_types = st.multiselect("Filter by threat type", unique_types, default=unique_types)

    filtered_df = df_threats[df_threats['threat_type'].isin(selected_types)]

    # Visualize
    st.subheader("📊 Threat Summary")
    st.bar_chart(filtered_df['threat_type'].value_counts())

    if 'details' in df_threats.columns:
        df_details = df_threats.dropna(subset=['details']).copy()
        df_details = df_details[df_details['details'].notnull()]
        if not df_details.empty:
            st.subheader("📈 ML-Based Anomaly Insights")
            details_expanded = df_details['details'].apply(pd.Series)
            st.line_chart(details_expanded)


2025-07-11 06:08:28.615 
  command:

    streamlit run /usr/local/lib/python3.11/dist-packages/colab_kernel_launcher.py [ARGUMENTS]


In [None]:

# prompt: run the above streamlit app

!streamlit run /content/sample_data/main.py & npx localtunnel --port 8501

[1G[0K⠙
Collecting usage statistics. To deactivate, set browser.gatherUsageStats to false.
[0m
[1G[0K⠹[1G[0K⠸[1G[0K⠼[1G[0K⠴[1G[0K⠦[0m
[34m[1m  You can now view your Streamlit app in your browser.[0m
[0m
[34m  Local URL: [0m[1mhttp://localhost:8501[0m
[34m  Network URL: [0m[1mhttp://172.28.0.12:8501[0m
[34m  External URL: [0m[1mhttp://34.150.227.64:8501[0m
[0m
[1G[0K⠧[1G[0K⠇[1G[0K⠏[1G[0K[1G[0JNeed to install the following packages:
localtunnel@2.0.2
Ok to proceed? (y) [20Gy

[1G[0K⠙[1G[0K⠹[1G[0K⠸[1G[0K⠼[1G[0K⠴[1G[0K⠦[1G[0K⠧[1G[0K⠇[1G[0K⠏[1G[0K⠋[1G[0K⠙[1G[0K⠹[1G[0K⠸[1G[0K⠼[1G[0K⠴[1G[0K⠦[1G[0K⠧[1G[0Kyour url is: https://lemon-goats-hang.loca.lt


Threat Analysis Recommendations Report
1. Strengthen Authentication Mechanisms
- Enforce strong password policies
- Implement multi-factor authentication (MFA)
- Lock accounts after failed login attempts
- Use CAPTCHA to block brute-force bots
2. Harden Access to Sensitive Endpoints
- Restrict access to /admin, /config, etc.
- Move admin panels behind VPN/IP whitelist
- Rename admin paths and secure .env files
- Disable directory listings
3. Block or Check Reputation of Suspicious IPs
- Use Geo-blocking
- Integrate threat intelligence feeds
- Automatically block repeat offenders
4. Improve Monitoring and Alerting
- Use SIEM tools (ELK, Splunk)
- Set real-time alerts for anomalies and failures
- Integrate Slack/email notifications
5. Enhance Logging Practices
- Log user agents, paths, session data
- Implement secure, append-only logging
- Apply log rotation and retention policies
6. Leverage ML More Deeply
- Train models on historical logs
- Use time-series for slow attacks
- Apply clustering to uncover new threat patterns
7. Automate Threat Response
- Integrate with WAF/Security groups
- Automate actions using SOAR platforms
- Create response playbooks for common threats