<a href="https://colab.research.google.com/github/nrraim25-ops/OT-Log-Analyzer/blob/main/BasicLogAnalyzer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import re
import json
from collections import Counter, defaultdict
from datetime import datetime
import matplotlib.pyplot as plt
from pathlib import Path

class OTLogAnalyzer:
    """Log Analyzer for OT/SCADA systems - Perfect for your Siemens project"""

    def __init__(self):
        self.log_entries = []
        self.statistics = {
            'total_entries': 0,
            'error_count': 0,
            'warning_count': 0,
            'device_counts': Counter(),
            'error_types': Counter(),
            'hourly_activity': defaultdict(int)
        }

        # Precompile regex patterns
        self.patterns = {
            'timestamp': re.compile(r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}'),
            'ip': re.compile(r'\b(?:\d{1,3}\.){3}\d{1,3}\b'),
            'severity': re.compile(r'\b(DEBUG|INFO|WARNING|ERROR|CRITICAL)\b'),
            'scada': re.compile(
                r'(?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\s+'
                r'(?P<device>PLC|RTU|Firewall|HMI)\s+'
                r'(?P<severity>DEBUG|INFO|WARNING|ERROR|CRITICAL)\s+'
                r'(?P<message>.*)'
            )
        }

    def parse_log(self, filepath):
        """Parse log file and extract structured information"""
        print(f" Analyzing log file: {filepath}")

        with open(filepath, 'r') as f:
            for line_num, line in enumerate(f, 1):
                line = line.strip()
                if not line:
                    continue

                # Try to parse as SCADA log first
                entry = self._parse_scada_log(line)
                if not entry:
                    # Fall back to generic parsing
                    entry = self._parse_generic_log(line, line_num)

                if entry:
                    self.log_entries.append(entry)
                    self._update_statistics(entry)

        print(f"Parsed {len(self.log_entries)} log entries")
        return self.log_entries

    def _parse_scada_log(self, line):
        """Parse SCADA-specific log format"""
        match = self.patterns['scada'].search(line)
        if match:
            entry = match.groupdict()
            entry['line'] = line
            return entry
        return None

    def _parse_generic_log(self, line, line_num):
        """Generic log parser with key=value extraction"""
        entry = {'line_number': line_num, 'raw': line}

        # Extract key=value pairs
        pairs = line.split()
        for pair in pairs:
            if '=' in pair:
                key, value = pair.split('=', 1)
                entry[key] = value.strip('"')

        # Extract patterns
        timestamp_match = self.patterns['timestamp'].search(line)
        if timestamp_match:
            entry['timestamp'] = timestamp_match.group()

        severity_match = self.patterns['severity'].search(line)
        if severity_match:
            entry['severity'] = severity_match.group()

        ip_matches = self.patterns['ip'].findall(line)
        if ip_matches:
            entry['ips'] = ip_matches

        return entry

    def _update_statistics(self, entry):
        """Update statistics based on parsed entry"""
        self.statistics['total_entries'] += 1

        # Count by severity
        if 'severity' in entry:
            severity = entry['severity'].upper()
            if severity == 'ERROR' or severity == 'CRITICAL':
                self.statistics['error_count'] += 1
            elif severity == 'WARNING':
                self.statistics['warning_count'] += 1

        # Count by device (for SCADA logs)
        if 'device' in entry:
            self.statistics['device_counts'][entry['device']] += 1

        # Hourly activity
        if 'timestamp' in entry:
            try:
                hour = entry['timestamp'][:13]  # YYYY-MM-DD HH
                self.statistics['hourly_activity'][hour] += 1
            except:
                pass

    def detect_anomalies(self, threshold=2):
        """Detect anomalies based on hourly activity"""
        anomalies = []

        if not self.statistics['hourly_activity']:
            return anomalies

        # Calculate baseline
        hourly_counts = list(self.statistics['hourly_activity'].values())
        mean = sum(hourly_counts) / len(hourly_counts)

        if len(hourly_counts) > 1:
            variance = sum((x - mean) ** 2 for x in hourly_counts) / len(hourly_counts)
            std_dev = variance ** 0.5
            anomaly_threshold = mean + (threshold * std_dev)
        else:
            anomaly_threshold = mean * 2

        # Detect anomalies
        for hour, count in self.statistics['hourly_activity'].items():
            if count > anomaly_threshold:
                anomalies.append({
                    'hour': hour,
                    'count': count,
                    'threshold': anomaly_threshold,
                    'deviation': (count - mean) / std_dev if std_dev > 0 else float('inf')
                })

        return anomalies

    def generate_report(self, output_format='text'):
        """Generate analysis report"""
        report = {
            'summary': {
                'total_entries': self.statistics['total_entries'],
                'error_rate': f"{(self.statistics['error_count'] / max(self.statistics['total_entries'], 1) * 100):.2f}%",
                'warning_rate': f"{(self.statistics['warning_count'] / max(self.statistics['total_entries'], 1) * 100):.2f}%",
            },
            'device_breakdown': dict(self.statistics['device_counts'].most_common()),
            'hourly_activity': dict(self.statistics['hourly_activity']),
            'anomalies': self.detect_anomalies()
        }

        if output_format == 'json':
            return json.dumps(report, indent=2)
        else:
            return self._format_text_report(report)

    def _format_text_report(self, report):
        """Format report as readable text"""
        lines = []
        lines.append("=" * 60)
        lines.append(" OT LOG ANALYZER REPORT")
        lines.append("=" * 60)
        lines.append(f"\n SUMMARY:")
        lines.append(f"  Total Entries: {report['summary']['total_entries']}")
        lines.append(f"  Error Rate: {report['summary']['error_rate']}")
        lines.append(f"  Warning Rate: {report['summary']['warning_rate']}")

        if report['device_breakdown']:
            lines.append(f"\n DEVICE BREAKDOWN:")
            for device, count in report['device_breakdown'].items():
                lines.append(f"  {device}: {count}")

        if report['anomalies']:
            lines.append(f"\n ANOMALIES DETECTED:")
            for anomaly in report['anomalies']:
                lines.append(f"  {anomaly['hour']}: {anomaly['count']} events (threshold: {anomaly['threshold']:.0f})")

        return "\n".join(lines)

    def visualize(self):
        """Create visualization of log activity"""
        if not self.statistics['hourly_activity']:
            print("No data to visualize")
            return

        # Prepare data
        hours = sorted(self.statistics['hourly_activity'].keys())
        counts = [self.statistics['hourly_activity'][h] for h in hours]

        # Create plot
        plt.figure(figsize=(12, 6))
        plt.plot(range(len(hours)), counts, marker='o', linestyle='-', linewidth=2)
        plt.title('OT System Activity Over Time')
        plt.xlabel('Time (Hour)')
        plt.ylabel('Number of Log Events')
        plt.xticks(range(len(hours)), [h[-5:] for h in hours], rotation=45)
        plt.grid(True, alpha=0.3)

        # Highlight anomalies
        anomalies = self.detect_anomalies()
        for anomaly in anomalies:
            if anomaly['hour'] in hours:
                idx = hours.index(anomaly['hour'])
                plt.plot(idx, anomaly['count'], 'ro', markersize=10)

        plt.tight_layout()
        plt.show()

# Example usage
def main():
    # Initialize analyzer
    analyzer = OTLogAnalyzer()

    # Create a sample log file for testing
    sample_log = """
2026-02-19 08:23:45 PLC INFO System startup complete
2026-02-19 08:23:46 PLC INFO Connection established to SCADA
2026-02-19 08:24:12 Firewall INFO Connection from 192.168.1.100 allowed
2026-02-19 08:25:33 RTU WARNING Communication timeout, retry 1/3
2026-02-19 08:26:45 PLC INFO Normal operation mode
2026-02-19 09:15:22 Firewall ERROR Unauthorized access attempt from 10.0.0.50
2026-02-19 09:16:01 PLC CRITICAL Unexpected shutdown detected
2026-02-19 09:16:45 HMI INFO Operator logged in
    """.strip().split('\n')

    # Write sample to file
    with open('sample_ot.log', 'w') as f:
        for line in sample_log:
            f.write(line + '\n')

    # Analyze the log
    analyzer.parse_log('sample_ot.log')

    # Generate report
    print(analyzer.generate_report())

    # Visualize (uncomment to see plot)
    # analyzer.visualize()

if __name__ == "__main__":
    main()

 Analyzing log file: sample_ot.log
Parsed 8 log entries
 OT LOG ANALYZER REPORT

 SUMMARY:
  Total Entries: 8
  Error Rate: 25.00%

 DEVICE BREAKDOWN:
  PLC: 4
  Firewall: 2
  RTU: 1
  HMI: 1
