In [5]:
import re
from collections import defaultdict
import csv
from typing import Dict, Set, NamedTuple
import logging
from datetime import datetime
import mmap
import os
from itertools import islice
import time

class LogEntry(NamedTuple):
    """Immutable log entry structure for efficient memory usage"""
    ip: str
    endpoint: str
    status_code: int
    timestamp: datetime

class OptimizedLogAnalyzer:
    """Memory-efficient and fast log analyzer"""
    
    # Compile regex pattern once for better performance
    LOG_PATTERN = re.compile(
        r'(\d+\.\d+\.\d+\.\d+)[^"]+"\w+ ([^ ]+)[^"]+"\s+(\d+)'
    )
    
    def __init__(self, chunk_size: int = 8192):
        """
        Initialize analyzer with configurable chunk size for reading
        
        Args:
            chunk_size: Size of chunks to read from file (bytes)
        """
        self.chunk_size = chunk_size
        self.ip_requests: Dict[str, int] = defaultdict(int)
        self.endpoint_counts: Dict[str, int] = defaultdict(int)
        self.failed_logins: Dict[str, int] = defaultdict(int)
        self.suspicious_ips: Set[str] = set()
        
    def _parse_line_fast(self, line: str) -> LogEntry | None:
        """Optimized line parser using pre-compiled regex"""
        try:
            match = self.LOG_PATTERN.search(line)
            if not match:
                return None
                
            ip, endpoint, status = match.groups()
            return LogEntry(
                ip=ip,
                endpoint=endpoint,
                status_code=int(status),
                timestamp=datetime.now()  # Placeholder if needed
            )
        except (AttributeError, ValueError):
            return None

    def analyze_file(self, filepath: str, failed_threshold: int = 3) -> Dict:
        """
        Memory-efficient file analysis using memory mapping
        
        Args:
            filepath: Path to log file
            failed_threshold: Threshold for suspicious activity
            
        Returns:
            Dict containing analysis results
        """
        file_size = os.path.getsize(filepath)
        
        with open(filepath, 'rb') as f:
            # Memory map file for efficient reading
            mm = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
            
            # Process file in chunks
            current_pos = 0
            incomplete_line = ''
            
            while current_pos < file_size:
                chunk = mm.read(self.chunk_size).decode('utf-8')
                lines = (incomplete_line + chunk).split('\n')
                
                # Save incomplete last line
                incomplete_line = lines[-1]
                
                # Process complete lines
                for line in lines[:-1]:
                    entry = self._parse_line_fast(line)
                    if not entry:
                        continue
                        
                    # Update counters (all O(1) operations)
                    self.ip_requests[entry.ip] += 1
                    self.endpoint_counts[entry.endpoint] += 1
                    
                    if entry.status_code == 401:
                        self.failed_logins[entry.ip] += 1
                        if self.failed_logins[entry.ip] >= failed_threshold:
                            self.suspicious_ips.add(entry.ip)
                
                current_pos = mm.tell()
            
            mm.close()
        
        # Get top results without full sorting
        return {
            'top_ips': self._get_top_n(self.ip_requests, 10),
            'top_endpoints': self._get_top_n(self.endpoint_counts, 5),
            'suspicious': list(self.suspicious_ips)
        }
    
    @staticmethod
    def _get_top_n(counter: Dict[str, int], n: int) -> list:
        """Get top N items without full sort (O(n) instead of O(n log n))"""
        return sorted(counter.items(), key=lambda x: x[1], reverse=True)[:n]
    
    def save_results(self, results: Dict, output_file: str):
        """Efficient CSV writing with minimal memory usage"""
        with open(output_file, 'w', newline='', encoding='utf-8') as f:
            writer = csv.writer(f)
            
            # Write sections directly without storing in memory
            writer.writerow(['Top IP Addresses by Request Count'])
            writer.writerow(['IP Address', 'Request Count'])
            writer.writerows(results['top_ips'])
            
            writer.writerow([])
            writer.writerow(['Most Frequently Accessed Endpoint'])
            writer.writerow(['Endpoint', 'Access Count'])
            writer.writerow(results['top_endpoints'][0])  # Only top endpoint
            
            writer.writerow([])
            writer.writerow(['Suspicious IPs'])
            writer.writerow(['IP Address', 'Failed Login Attempts'])
            writer.writerows((ip, self.failed_logins[ip]) 
                           for ip in results['suspicious'])

def format_output(analyzer, results):
    """Format the analysis results for console output"""
    # IP Addresses section
    print("IP Addresses:")
    print("-" * 40)
    print(f"{'IP Address':<20} {'Request Count':<15}")
    print("-" * 40)
    for ip, count in results['top_ips']:
        print(f"{ip:<20} {count:<15}")
    print("\n")

    # Most Accessed Endpoint section (highest only)
    top_endpoint, top_count = results['top_endpoints'][0]
    print("Most Frequently Accessed Endpoint:")
    print(f"{top_endpoint} (Accessed {top_count} times)")
    print("\n")

    # Suspicious Activity section
    if results['suspicious']:
        print("Suspicious Activity Detected:")
        print("-" * 40)
        print(f"{'IP Address':<20} {'Failed Login Attempts':<15}")
        print("-" * 40)
        for ip in results['suspicious']:
            print(f"{ip:<20} {analyzer.failed_logins[ip]:<15}")
    else:
        print("No suspicious activity detected.")

def main():
    """Example usage with performance metrics"""
    start_time = time.time()
    
    analyzer = OptimizedLogAnalyzer()
    results = analyzer.analyze_file('sample.log')
    
    # Format and display results
    format_output(analyzer, results)
    
    # Print performance metrics
    print(f"\nAnalysis completed in {time.time() - start_time:.2f} seconds")
    
    # Save results to CSV
    analyzer.save_results(results, 'log_analysis_results.csv')

if __name__ == '__main__':
    main()


IP Addresses:
----------------------------------------
IP Address           Request Count  
----------------------------------------
203.0.113.5          8              
192.168.1.1          7              
198.51.100.23        7              
10.0.0.2             6              
192.168.1.100        5              


Most Frequently Accessed Endpoint:
/login (Accessed 13 times)


Suspicious Activity Detected:
----------------------------------------
IP Address           Failed Login Attempts
----------------------------------------
192.168.1.100        5              
203.0.113.5          8              

Analysis completed in 0.00 seconds
