In [6]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
import numpy as np
import random
import re
from datetime import datetime, timedelta
from collections import defaultdict, Counter
from sklearn.metrics import classification_report, f1_score
import json
from typing import List, Tuple, Dict, Optional
import warnings
warnings.filterwarnings('ignore')
torch.manual_seed(420)
np.random.seed(420)
random.seed(420)

class LogDataGenerator:
    """Generate synthetic log data for training"""
    
    def __init__(self):
        self.usernames = ['admin', 'user', 'guest', 'root', 'service', 'test', 'demo', 'john.doe', 'alice', 'bob']
        self.ip_ranges = ['192.168.1.', '10.0.0.', '172.16.0.', '203.0.113.', '198.51.100.']
        self.ports = ['22', '80', '443', '8080', '3389', '21', '23', '25']
        self.durations = ['0.1s', '0.5s', '1.2s', '2.3s', '0.8s', '1.5s', '3.1s']
        
        self.success_templates = [
            "{timestamp} - Login successful for user {username} from {ip}",
            "{timestamp} - Authentication succeeded for {username} from {ip} on port {port}",
            "{timestamp} - User {username} logged in successfully from {ip}",
            "{timestamp} - Successful login: {username} from {ip} (duration: {duration})",
            "{timestamp} - Access granted to user {username} from {ip}",
            "{timestamp} - Logged in successfully for user {username} from {ip}",
            "{timestamp} - Logged in successfully for user {username} from {ip}"
        ]
        
        self.failure_templates = [
            "{timestamp} - Login failed for user {username} from {ip}",
            "{timestamp} - Authentication failed for {username} from {ip}",
            "{timestamp} - Failed login attempt by {username} from {ip}",
            "{timestamp} - Access denied for user {username} from {ip} on port {port}",
            "{timestamp} - Login failure: {username} from {ip} (invalid credentials)",
        ]
    
    def generate_timestamp(self):
        """Generate random timestamp"""
        start_date = datetime(2023, 1, 1)
        end_date = datetime(2023, 12, 31)
        random_date = start_date + timedelta(
            seconds=random.randint(0, int((end_date - start_date).total_seconds()))
        )
        return random_date.strftime("%Y-%m-%d %H:%M:%S")
    
    def generate_ip(self):
        """Generate random IP address"""
        return random.choice(self.ip_ranges) + str(random.randint(1, 254))
    
    def generate_log_entry(self, success: bool) -> str:
        """Generate a single log entry"""
        template = random.choice(self.success_templates if success else self.failure_templates)
        
        params = {
            'timestamp': self.generate_timestamp(),
            'username': random.choice(self.usernames),
            'ip': self.generate_ip(),
            'port': random.choice(self.ports),
            'duration': random.choice(self.durations)
        }
        
        return template.format(**params)
    
    def generate_dataset(self, n_success: int = 1000, n_failure: int = 1000) -> List[str]:
        """Generate complete dataset"""
        logs = []
        
        for _ in range(n_success):
            logs.append(self.generate_log_entry(True))
    
        for _ in range(n_failure):
            logs.append(self.generate_log_entry(False))
        
        random.shuffle(logs)
        return logs

class BIOTagger:
    """Convert logs to BIO format for NER training"""
    
    def __init__(self):
        self.entity_patterns = {
            'TIMESTAMP': r'\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2}',
            'IP_ADDRESS': r'\b(?:\d{1,3}\.){3}\d{1,3}\b',
            'USERNAME': r'(?:user\s+|for\s+)([a-zA-Z0-9._-]+)(?:\s+from|\s+on|\s*$)',
            'PORT': r'port\s+(\d+)',
            'DURATION': r'\(duration:\s+([0-9.]+s)\)',
            'STATUS': r'(successful|succeeded|successfully|failed|failure|denied|granted)'
        }
    
    def tokenize(self, text: str) -> List[str]:
        """Simple tokenization"""
        # Split on whitespace and punctuation, but keep them
        tokens = re.findall(r'\w+|[^\w\s]', text)
        return tokens
    
    def tag_entities(self, text: str) -> List[Tuple[str, str]]:
        """Tag entities in text using BIO format"""
        tokens = self.tokenize(text)
        tags = ['O'] * len(tokens)
        
        # Join tokens back to text for pattern matching
        token_positions = []
        current_pos = 0
        for token in tokens:
            start = text.find(token, current_pos)
            end = start + len(token)
            token_positions.append((start, end))
            current_pos = end
        
        # Apply entity patterns
        for entity_type, pattern in self.entity_patterns.items():
            for match in re.finditer(pattern, text, re.IGNORECASE):
                start, end = match.span()
                
                # Find overlapping tokens
                entity_tokens = []
                for i, (token_start, token_end) in enumerate(token_positions):
                    if token_start >= start and token_end <= end:
                        entity_tokens.append(i)
                
                # Apply BIO tagging
                if entity_tokens:
                    tags[entity_tokens[0]] = f'B-{entity_type}'
                    for i in entity_tokens[1:]:
                        tags[i] = f'I-{entity_type}'
        
        return list(zip(tokens, tags))

class NERDataset(Dataset):
    """PyTorch Dataset for NER data"""
    
    def __init__(self, sentences: List[List[Tuple[str, str]]], word2idx: Dict, tag2idx: Dict):
        self.sentences = sentences
        self.word2idx = word2idx
        self.tag2idx = tag2idx
    
    def __len__(self):
        return len(self.sentences)
    
    def __getitem__(self, idx):
        sentence = self.sentences[idx]
        words = [word for word, tag in sentence]
        tags = [tag for word, tag in sentence]
        
        # Convert to indices
        word_ids = [self.word2idx.get(word, self.word2idx['<UNK>']) for word in words]
        tag_ids = [self.tag2idx[tag] for tag in tags]
        
        return torch.tensor(word_ids), torch.tensor(tag_ids)

def collate_fn(batch):
    """Collation function for DataLoader"""
    words, tags = zip(*batch)
    
    # Pad sequences
    words_padded = pad_sequence(words, batch_first=True, padding_value=0)
    tags_padded = pad_sequence(tags, batch_first=True, padding_value=0)
    
    # Create lengths
    lengths = torch.tensor([len(seq) for seq in words])
    
    return words_padded, tags_padded, lengths

class BiLSTMCRF(nn.Module):
    """BiLSTM-CRF model for NER"""
    
    def __init__(self, vocab_size, tagset_size, embedding_dim=100, hidden_dim=128):
        super(BiLSTMCRF, self).__init__()
        
        self.embedding_dim = embedding_dim
        self.hidden_dim = hidden_dim
        self.vocab_size = vocab_size
        self.tagset_size = tagset_size
        
        # Embedding layer
        self.word_embeds = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
        
        # BiLSTM layer
        self.lstm = nn.LSTM(embedding_dim, hidden_dim // 2,
                           num_layers=1, bidirectional=True, batch_first=True)
        
        # Linear layer to map LSTM output to tag space
        self.hidden2tag = nn.Linear(hidden_dim, tagset_size)
        
        # Dropout for regularization
        self.dropout = nn.Dropout(0.1)
        
    def forward(self, sentence, lengths):
        # Get embeddings
        embeds = self.word_embeds(sentence)
        embeds = self.dropout(embeds)
        
        # Pack padded sequence
        packed_embeds = nn.utils.rnn.pack_padded_sequence(
            embeds, lengths.cpu(), batch_first=True, enforce_sorted=False
        )
        
        # BiLSTM
        lstm_out, _ = self.lstm(packed_embeds)
        
        # Unpack
        lstm_out, _ = nn.utils.rnn.pad_packed_sequence(lstm_out, batch_first=True)
        
        # Apply dropout
        lstm_out = self.dropout(lstm_out)
        
        # Get tag scores
        tag_space = self.hidden2tag(lstm_out)
        
        return tag_space

class NERModel:
    """Main NER model class"""
    
    def __init__(self, embedding_dim=100, hidden_dim=128):
        self.embedding_dim = embedding_dim
        self.hidden_dim = hidden_dim
        self.model = None
        self.word2idx = None
        self.tag2idx = None
        self.idx2tag = None
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        
    def prepare_data(self, logs: List[str]):
        """Prepare data for training"""
        print("Preparing data...")
        
        # Generate BIO tags
        tagger = BIOTagger()
        tagged_sentences = []
        
        for log in logs:
            tagged = tagger.tag_entities(log)
            if tagged:  # Only add non-empty sentences
                tagged_sentences.append(tagged)
        
        print(f"Generated {len(tagged_sentences)} tagged sentences")
        
        # Build vocabularies
        words = []
        tags = []
        
        for sentence in tagged_sentences:
            for word, tag in sentence:
                words.append(word.lower())
                tags.append(tag)
        
        # Create word vocabulary
        word_counts = Counter(words)
        vocab_words = ['<PAD>', '<UNK>'] + [word for word, count in word_counts.most_common()]
        self.word2idx = {word: idx for idx, word in enumerate(vocab_words)}
        
        # Create tag vocabulary
        unique_tags = list(set(tags))
        self.tag2idx = {tag: idx for idx, tag in enumerate(unique_tags)}
        self.idx2tag = {idx: tag for tag, idx in self.tag2idx.items()}
        
        print(f"Vocabulary size: {len(self.word2idx)}")
        print(f"Tag set size: {len(self.tag2idx)}")
        print(f"Tags: {list(self.tag2idx.keys())}")
        
        return tagged_sentences
    
    def create_data_loaders(self, sentences, train_ratio=0.8, batch_size=16):
        """Create train and validation data loaders"""
        # Split data
        split_idx = int(len(sentences) * train_ratio)
        train_sentences = sentences[:split_idx]
        val_sentences = sentences[split_idx:]
        
        # Create datasets
        train_dataset = NERDataset(train_sentences, self.word2idx, self.tag2idx)
        val_dataset = NERDataset(val_sentences, self.word2idx, self.tag2idx)
        
        # Create data loaders
        train_loader = DataLoader(train_dataset, batch_size=batch_size, 
                                shuffle=True, collate_fn=collate_fn)
        val_loader = DataLoader(val_dataset, batch_size=batch_size, 
                              shuffle=False, collate_fn=collate_fn)
        
        return train_loader, val_loader
    
    def build_model(self):
        """Build the BiLSTM-CRF model"""
        self.model = BiLSTMCRF(
            vocab_size=len(self.word2idx),
            tagset_size=len(self.tag2idx),
            embedding_dim=self.embedding_dim,
            hidden_dim=self.hidden_dim
        ).to(self.device)
        
        return self.model
    
    def train(self, train_loader, val_loader, epochs=20, lr=0.001):
        """Train the model"""
        print(f"Training on {self.device}")
        
        optimizer = optim.Adam(self.model.parameters(), lr=lr)
        criterion = nn.CrossEntropyLoss(ignore_index=0)  # Ignore padding
        
        best_val_loss = float('inf')
        
        for epoch in range(epochs):
            # Training
            self.model.train()
            train_loss = 0
            
            for batch_idx, (words, tags, lengths) in enumerate(train_loader):
                words, tags, lengths = words.to(self.device), tags.to(self.device), lengths.to(self.device)
                
                optimizer.zero_grad()
                
                # Forward pass
                tag_scores = self.model(words, lengths)
                
                # Reshape for loss calculation
                tag_scores = tag_scores.view(-1, tag_scores.shape[-1])
                tags = tags.view(-1)
                
                loss = criterion(tag_scores, tags)
                loss.backward()
                optimizer.step()
                
                train_loss += loss.item()
            
            # Validation
            self.model.eval()
            val_loss = 0
            
            with torch.no_grad():
                for words, tags, lengths in val_loader:
                    words, tags, lengths = words.to(self.device), tags.to(self.device), lengths.to(self.device)
                    
                    tag_scores = self.model(words, lengths)
                    tag_scores = tag_scores.view(-1, tag_scores.shape[-1])
                    tags = tags.view(-1)
                    
                    loss = criterion(tag_scores, tags)
                    val_loss += loss.item()
            
            avg_train_loss = train_loss / len(train_loader)
            avg_val_loss = val_loss / len(val_loader)
            
            print(f'Epoch {epoch+1}/{epochs}:')
            print(f'  Train Loss: {avg_train_loss:.4f}')
            print(f'  Val Loss: {avg_val_loss:.4f}')
            
            # Save best model and vocabularies
            if avg_val_loss < best_val_loss:
                best_val_loss = avg_val_loss
                
                # Save model state
                torch.save({
                    'model_state_dict': self.model.state_dict(),
                    'word2idx': self.word2idx,
                    'tag2idx': self.tag2idx,
                    'idx2tag': self.idx2tag,
                    'vocab_size': len(self.word2idx),
                    'tagset_size': len(self.tag2idx),
                    'embedding_dim': self.embedding_dim,
                    'hidden_dim': self.hidden_dim
                }, 'best_ner_model.pth')
                
                print(f"  Model saved with vocab_size={len(self.word2idx)}, tagset_size={len(self.tag2idx)}")
        
        # Also save vocabularies separately for easy access
        import pickle
        with open('vocabularies.pkl', 'wb') as f:
            pickle.dump({
                'word2idx': self.word2idx,
                'tag2idx': self.tag2idx,
                'idx2tag': self.idx2tag
            }, f)
    
    def predict(self, text: str) -> Dict[str, str]:
        """Predict entities in a text"""
        if self.model is None:
            raise ValueError("Model not trained yet!")
        
        self.model.eval()
        
        # Tokenize
        tagger = BIOTagger()
        tokens = tagger.tokenize(text)
        
        # Convert to indices
        word_ids = [self.word2idx.get(token.lower(), self.word2idx['<UNK>']) for token in tokens]
        
        # Create tensors
        words_tensor = torch.tensor([word_ids]).to(self.device)
        lengths_tensor = torch.tensor([len(word_ids)]).to(self.device)
        
        # Predict
        with torch.no_grad():
            tag_scores = self.model(words_tensor, lengths_tensor)
            predicted_tags = torch.argmax(tag_scores, dim=2).squeeze().cpu().numpy()
        
        # Convert back to tags
        if len(predicted_tags.shape) == 0:  # Single token
            predicted_tags = [predicted_tags.item()]
        
        predicted_tag_names = [self.idx2tag[tag_id] for tag_id in predicted_tags]
        
        # Extract entities
        entities = {}
        current_entity = None
        current_tokens = []
        
        for token, tag in zip(tokens, predicted_tag_names):
            if tag.startswith('B-'):
                # Save previous entity
                if current_entity and current_tokens:
                    entities[current_entity] = ' '.join(current_tokens)
                
                # Start new entity
                current_entity = tag[2:]  # Remove 'B-'
                current_tokens = [token]
            elif tag.startswith('I-') and current_entity == tag[2:]:
                current_tokens.append(token)
            else:
                # Save previous entity
                if current_entity and current_tokens:
                    entities[current_entity] = ' '.join(current_tokens)
                current_entity = None
                current_tokens = []
        
        # Don't forget the last entity
        if current_entity and current_tokens:
            entities[current_entity] = ' '.join(current_tokens)
        
        return entities



In [7]:
def main():
    """Main training and evaluation function"""
    print("=== NER Model for System Log Analysis ===\n")
    
    # 1. Generate synthetic data
    print("1. Generating synthetic log data...")
    generator = LogDataGenerator()
    logs = generator.generate_dataset(n_success=100, n_failure=100)
    
    print(f"Generated {len(logs)} log entries")
    print("\nSample logs:")
    for i, log in enumerate(logs[:3]):
        print(f"  {i+1}. {log}")
    
    # 2. Initialize and prepare data
    print("\n2. Preparing data for training...")
    ner_model = NERModel(embedding_dim=100, hidden_dim=256)
    tagged_sentences = ner_model.prepare_data(logs)
    
    # Show sample tagged sentence
    print("\nSample BIO tagged sentence:")
    sample_sentence = tagged_sentences[0]
    for word, tag in sample_sentence:
        print(f"  {word:15} {tag}")
    
    # 3. Create data loaders
    print("\n3. Creating data loaders...")
    train_loader, val_loader = ner_model.create_data_loaders(tagged_sentences, batch_size=4)
    print(f"Train batches: {len(train_loader)}")
    print(f"Validation batches: {len(val_loader)}")
    
    # 4. Build and train model
    print("\n4. Building and training model...")
    ner_model.build_model()
    ner_model.train(train_loader, val_loader, epochs=8, lr=0.001)
    
    # 5. Test inference
    print("\n5. Testing inference...")
    
    test_logs = [
        "2023-06-03 14:22:01 - Login succeeded for user guest from 10.0.0.2",
        "2023-12-15 09:30:45 - Authentication failed for admin from 192.168.1.100 on port 22",
        "2023-08-20 16:45:30 - User john.doe logged in successfully from 172.16.0.50"
    ]
    
    print("\nTest Results:")
    for i, test_log in enumerate(test_logs):
        print(f"\nTest {i+1}:")
        print(f"Input: {test_log}")
        
        try:
            entities = ner_model.predict(test_log)
            print("Extracted entities:")
            for entity_type, value in entities.items():
                print(f"  {entity_type}: {value}")
        except Exception as e:
            print(f"Error during prediction: {e}")
    
    print("\n=== Training Complete! ===")
    print("Model saved as 'best_ner_model.pth'")

if __name__ == "__main__":
    main()

=== NER Model for System Log Analysis ===

1. Generating synthetic log data...
Generated 200 log entries

Sample logs:
  1. 2023-04-18 02:49:35 - Authentication succeeded for service from 192.168.1.99 on port 23
  2. 2023-12-23 07:15:22 - Logged in successfully for user user from 198.51.100.83
  3. 2023-08-09 19:32:14 - Logged in successfully for user alice from 192.168.1.69

2. Preparing data for training...
Preparing data...
Generated 200 tagged sentences
Vocabulary size: 223
Tag set size: 12
Tags: ['I-PORT', 'B-STATUS', 'I-TIMESTAMP', 'I-DURATION', 'B-IP_ADDRESS', 'I-USERNAME', 'B-TIMESTAMP', 'O', 'I-IP_ADDRESS', 'B-DURATION', 'B-USERNAME', 'B-PORT']

Sample BIO tagged sentence:
  2023            B-TIMESTAMP
  -               I-TIMESTAMP
  04              I-TIMESTAMP
  -               I-TIMESTAMP
  18              I-TIMESTAMP
  02              I-TIMESTAMP
  :               I-TIMESTAMP
  49              I-TIMESTAMP
  :               I-TIMESTAMP
  35              I-TIMESTAMP
  -      

In [None]:
import torch
import torch.nn as nn
import re
import json
from typing import List, Dict
import os

class BIOTagger:
    """Convert logs to BIO format for NER training"""
    
    def __init__(self):
        self.entity_patterns = {
            'TIMESTAMP': r'\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2}',
            'IP_ADDRESS': r'\b(?:\d{1,3}\.){3}\d{1,3}\b',
            'USERNAME': r'(?:user\s+|for\s+)([a-zA-Z0-9._-]+)(?:\s+from|\s+on|\s*$)',
            'PORT': r'port\s+(\d+)',
            'DURATION': r'\(duration:\s+([0-9.]+s)\)',
            'STATUS': r'(successful|succeeded|successfully|failed|failure|denied|granted)'
        }
    
    def tokenize(self, text: str) -> List[str]:
        """Simple tokenization"""
        tokens = re.findall(r'\w+|[^\w\s]', text)
        return tokens

class BiLSTMCRF(nn.Module):
    """BiLSTM-CRF model for NER"""
    
    def __init__(self, vocab_size, tagset_size, embedding_dim=100, hidden_dim=128):
        super(BiLSTMCRF, self).__init__()
        
        self.embedding_dim = embedding_dim
        self.hidden_dim = hidden_dim
        self.vocab_size = vocab_size
        self.tagset_size = tagset_size
        
        # Embedding layer
        self.word_embeds = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
        
        # BiLSTM layer
        self.lstm = nn.LSTM(embedding_dim, hidden_dim=2, num_layers=1, bidirectional=True, batch_first=True)
        
        # Linear layer to map LSTM output to tag space
        self.hidden2tag = nn.Linear(hidden_dim, tagset_size)
        
        # Dropout for regularization
        self.dropout = nn.Dropout(0.1)
        
    def forward(self, sentence, lengths):
        # Get embeddings
        embeds = self.word_embeds(sentence)
        embeds = self.dropout(embeds)
        
        # Pack padded sequence
        packed_embeds = nn.utils.rnn.pack_padded_sequence(embeds, lengths.cpu(), batch_first=True, enforce_sorted=False)
        
        lstm_out, _ = self.lstm(packed_embeds)
        lstm_out, _ = nn.utils.rnn.pad_packed_sequence(lstm_out, batch_first=True)
        lstm_out = self.dropout(lstm_out)
        tag_space = self.hidden2tag(lstm_out)
        
        return tag_space

class NERModelTester:
    """Test the trained NER model"""
    
    def __init__(self, model_path='best_ner_model.pth'):
        self.model_path = model_path
        self.model = None
        self.word2idx = None
        self.tag2idx = None
        self.idx2tag = None
        self.vocab_size = None
        self.tagset_size = None
        self.embedding_dim = None
        self.hidden_dim = None
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.tagger = BIOTagger()
    
    def load_model(self):
        """Load the trained model and vocabularies"""
        if not os.path.exists(self.model_path):
            print(f"Error: Model file '{self.model_path}' not found!")
            print("Please run the training script first to generate the model.")
            return False
        
        try:
            # Load checkpoint with vocabularies
            checkpoint = torch.load(self.model_path, map_location=self.device)
            
            # Load vocabularies from checkpoint
            self.word2idx = checkpoint['word2idx']
            self.tag2idx = checkpoint['tag2idx']
            self.idx2tag = checkpoint['idx2tag']
            self.vocab_size = checkpoint['vocab_size']
            self.tagset_size = checkpoint['tagset_size']
            self.embedding_dim = checkpoint['embedding_dim']
            self.hidden_dim = checkpoint['hidden_dim']
            
            print(f"Loaded vocabularies:")
            print(f"  Vocabulary size: {self.vocab_size}")
            print(f"  Tag set size: {self.tagset_size}")
            print(f"  Tags: {list(self.tag2idx.keys())}")
            
            # Initialize model with correct dimensions
            self.model = BiLSTMCRF(
                vocab_size=self.vocab_size,
                tagset_size=self.tagset_size,
                embedding_dim=self.embedding_dim,
                hidden_dim=self.hidden_dim
            ).to(self.device)
            
            # Load trained weights
            self.model.load_state_dict(checkpoint['model_state_dict'])
            self.model.eval()
            
            print(f"Model loaded successfully from {self.model_path}")
            print(f"Running on: {self.device}")
            return True
            
        except Exception as e:
            print(f"Error loading model: {e}")
            return False
    
    def predict(self, text: str) -> Dict[str, str]:
        """Predict entities in a text"""
        if self.model is None:
            raise ValueError("Model not loaded!")
        
        # Tokenize
        tokens = self.tagger.tokenize(text)
        
        # Convert to indices
        word_ids = [self.word2idx.get(token.lower(), self.word2idx['<UNK>']) for token in tokens]
        
        # Create tensors
        words_tensor = torch.tensor([word_ids]).to(self.device)
        lengths_tensor = torch.tensor([len(word_ids)]).to(self.device)
        
        # Predict
        with torch.no_grad():
            tag_scores = self.model(words_tensor, lengths_tensor)
            predicted_tags = torch.argmax(tag_scores, dim=2).squeeze().cpu().numpy()
        
        # Convert back to tags
        if len(predicted_tags.shape) == 0:  # Single token
            predicted_tags = [predicted_tags.item()]
        
        predicted_tag_names = [self.idx2tag[tag_id] for tag_id in predicted_tags]
        
        # Extract entities
        entities = self._extract_entities(tokens, predicted_tag_names)
        
        return entities, list(zip(tokens, predicted_tag_names))
    
    def _extract_entities(self, tokens: List[str], tags: List[str]) -> Dict[str, str]:
        """Extract entities from tokens and tags"""
        entities = {}
        current_entity = None
        current_tokens = []
        
        for token, tag in zip(tokens, tags):
            if tag.startswith('B-'):
                # Save previous entity
                if current_entity and current_tokens:
                    entities[current_entity] = ' '.join(current_tokens)
                
                # Start new entity
                current_entity = tag[2:]  # Remove 'B-'
                current_tokens = [token]
            elif tag.startswith('I-') and current_entity == tag[2:]:
                current_tokens.append(token)
            else:
                # Save previous entity
                if current_entity and current_tokens:
                    entities[current_entity] = ' '.join(current_tokens)
                current_entity = None
                current_tokens = []
        
        # Don't forget the last entity
        if current_entity and current_tokens:
            entities[current_entity] = ' '.join(current_tokens)
        
        return entities
    
    def test_logs(self, test_logs: List[str]):
        """Test the model on a list of log entries"""
        if not self.load_model():
            return
        
        print("\n" + "="*80)
        print("TESTING NER MODEL ON LOG ENTRIES")
        print("="*80)
        
        for i, log in enumerate(test_logs, 1):
            print(f"\n--- Test Case {i} ---")
            print(f"Input Log:")
            print(f"  {log}")
            
            try:
                entities, token_tags = self.predict(log)
                
                print(f"\nToken-level Predictions:")
                for token, tag in token_tags:
                    print(f"  {token:15} -> {tag}")
                
                print(f"\nExtracted Entities:")
                if entities:
                    for entity_type, value in entities.items():
                        print(f"  {entity_type:12}: {value}")
                else:
                    print("  No entities detected")
                
                # Format as JSON for easy copying
                print(f"\nJSON Output:")
                print(f"  {json.dumps(entities, indent=2)}")
                
            except Exception as e:
                print(f"  Error during prediction: {e}")
            
            print("-" * 60)
    
    def interactive_test(self):
        """Interactive testing mode"""
        if not self.load_model():
            return
        
        print("\n" + "="*60)
        print("INTERACTIVE NER TESTING MODE")
        print("Enter log messages to analyze (type 'quit' to exit)")
        print("="*60)
        
        while True:
            try:
                user_input = input("\nEnter log message: ").strip()
                
                if user_input.lower() in ['quit', 'exit', 'q']:
                    print("Goodbye!")
                    break
                
                if not user_input:
                    continue
                
                entities, token_tags = self.predict(user_input)
                
                print("\nToken Analysis:")
                for token, tag in token_tags:
                    print(f"  {token:15} -> {tag}")
                
                print("\nExtracted Entities:")
                if entities:
                    for entity_type, value in entities.items():
                        print(f"  {entity_type:12}: {value}")
                else:
                    print("  No entities detected")
                
            except KeyboardInterrupt:
                print("\nGoodbye!")
                break
            except Exception as e:
                print(f"Error: {e}")

def main():
    """Main testing function"""
    # Sample test logs
    test_logs = [
        "2023-06-03 14:22:01 - Login succeeded for user guest from 10.0.0.2",
        "2023-12-15 09:30:45 - Authentication failed for admin from 192.168.1.100 on port 22",
        "2023-08-20 16:45:30 - User john.doe logged in successfully from 172.16.0.50",
        "2023-11-02 11:15:33 - Access denied for user test from 203.0.113.45",
        "2023-09-18 08:42:17 - Successful login: alice from 10.0.0.25 (duration: 1.2s)",
        "2023-07-25 19:28:54 - Failed login attempt by root from 192.168.1.200 on port 3389",
        "2023-10-12 13:37:22 - Login failure: bob from 172.16.0.99 (invalid credentials)"
    ]
    
    # Initialize tester
    tester = NERModelTester()
    
    # Test on sample logs
    tester.test_logs(test_logs)

if __name__ == "__main__":
    main()

Loaded vocabularies:
  Vocabulary size: 232
  Tag set size: 12
  Tags: ['I-PORT', 'B-STATUS', 'I-TIMESTAMP', 'I-DURATION', 'B-IP_ADDRESS', 'I-USERNAME', 'B-TIMESTAMP', 'O', 'I-IP_ADDRESS', 'B-DURATION', 'B-USERNAME', 'B-PORT']
Model loaded successfully from best_ner_model.pth
Running on: cuda

TESTING NER MODEL ON LOG ENTRIES

--- Test Case 1 ---
Input Log:
  2023-06-03 14:22:01 - Login succeeded for user guest from 10.0.0.2

Token-level Predictions:
  2023            -> B-TIMESTAMP
  -               -> I-TIMESTAMP
  06              -> I-TIMESTAMP
  -               -> I-TIMESTAMP
  03              -> I-TIMESTAMP
  14              -> I-TIMESTAMP
  :               -> I-TIMESTAMP
  22              -> I-TIMESTAMP
  :               -> I-TIMESTAMP
  01              -> I-TIMESTAMP
  -               -> O
  Login           -> O
  succeeded       -> B-STATUS
  for             -> O
  user            -> B-USERNAME
  guest           -> I-USERNAME
  from            -> I-USERNAME
  10              ->

In [None]:
import nltk
nltk.download('wordnet')
from nltk.corpus import wordnet

def get_synonyms(word):
    synonyms = set()
    for syn in wordnet.synsets(word):
        for lemma in syn.lemmas():
            synonyms.add(lemma.name())
    return synonyms

succ = []

success_words = get_synonyms("success")
failure_words = get_synonyms("failure")

[nltk_data] Downloading package wordnet to
[nltk_data]     C:\Users\Tomas\AppData\Roaming\nltk_data...


In [75]:
failure_words

{'bankruptcy', 'failure', 'loser', 'nonstarter', 'unsuccessful_person'}

In [2]:
import spacy
from transformers import AutoTokenizer, AutoModel

tokenizer = AutoTokenizer.from_pretrained('bert-base-cased')
model = AutoModel.from_pretrained('bert-base-cased', output_hidden_states=True).eval()

w1 = "Failure"
w2 = "Failed"
w3 = "Denied"



tok1 = tokenizer(w1, return_tensors="pt")
tok2 = tokenizer(w2, return_tensors="pt")
tok3 = tokenizer(w3, return_tensors="pt")

# Load spaCy model with word vectors
nlp = spacy.load("en_core_web_md")

# Words to compare
word1 = nlp("failure")
word2 = nlp("failed")
word3 = nlp("denied")

# Compute similarities
print("failure vs failed:", word1.similarity(word2))  # ~0.7–0.9
print("failure vs denied:", word1.similarity(word3))  # ~0.4–0.6

print("Tok 1 looks like:", tok1)


RuntimeError: Failed to import transformers.models.bert.modeling_bert because of the following error (look up to see its traceback):
Traceback (most recent call last):
  File "c:\Users\Tomas\AppData\Local\Programs\Python\Python312\Lib\site-packages\tensorflow\python\pywrap_tensorflow.py", line 73, in <module>
    from tensorflow.python._pywrap_tensorflow_internal import *
ImportError: DLL load failed while importing _pywrap_tensorflow_internal: Proces inicializace dynamicky připojované knihovny (DLL) se nezdařil.


Failed to load the native TensorFlow runtime.
See https://www.tensorflow.org/install/errors for some common causes and solutions.
If you need help, create an issue at https://github.com/tensorflow/tensorflow/issues and include the entire stack trace above this error message.