In [33]:
import pandas as pd
import numpy as np
import json
import time
import os
from dotenv import load_dotenv
from langchain_groq import ChatGroq
from langchain.schema import HumanMessage
from langchain.prompts import PromptTemplate
from pydantic import BaseModel, Field
from collections import Counter
import warnings
import pickle
from pathlib import Path
import datetime
warnings.filterwarnings('ignore')

print("LLM v2: Bulletproof Processing with Checkpoints (1000 logs)")



LLM v2: Bulletproof Processing with Checkpoints (1000 logs)


In [34]:
# Create checkpoint system
checkpoint_dir = Path("llm_checkpoints")
checkpoint_dir.mkdir(exist_ok=True)

# Load environment and data
load_dotenv()
df = pd.read_csv('../data/nova_logs_with_bert.csv')
unclassified_logs = df[(df['regex_label'].isnull()) & (df['bert_label'].isnull())].copy()
print(f"Unclassified logs available: {len(unclassified_logs)}")

# Global variables for tracking
RESULTS_LIST = []
PROCESSED_COUNT = 0
CONSECUTIVE_FAILURES = 0
MAX_CONSECUTIVE_FAILURES = 5  # Stop after 5 consecutive failures


Unclassified logs available: 14972


In [35]:
def save_checkpoint_immediately(results_list, count, filename="llm_v2_checkpoint.pkl"):
    """Save checkpoint immediately - never lose results"""
    checkpoint_data = {
        'results': results_list,
        'processed_count': count,
        'timestamp': datetime.datetime.now(),
        'success_rate': len([r for r in results_list if r.category not in ['Processing_Error', 'Rate_Limit_Error']]) / max(len(results_list), 1)
    }
    
    checkpoint_path = checkpoint_dir / filename
    with open(checkpoint_path, 'wb') as f:
        pickle.dump(checkpoint_data, f)
    
    # Also save as CSV for safety
    if results_list:
        results_data = []
        for i, result in enumerate(results_list):
            results_data.append({
                'log_index': i,
                'llm_category': result.category,
                'llm_confidence': result.confidence,
                'llm_reasoning': result.reasoning
            })
        
        results_df = pd.DataFrame(results_data)
        results_df.to_csv(f'llm_v2_results_backup_{count}.csv', index=False)
    
    print(f"✅ Checkpoint saved: {count} logs processed")

def load_existing_checkpoint(filename="llm_v2_checkpoint.pkl"):
    """Load existing checkpoint if available"""
    checkpoint_path = checkpoint_dir / filename
    
    if checkpoint_path.exists():
        with open(checkpoint_path, 'rb') as f:
            checkpoint_data = pickle.load(f)
        
        print(f"📁 Checkpoint found: {checkpoint_data['processed_count']} logs previously processed")
        print(f"📊 Previous success rate: {checkpoint_data['success_rate']:.2%}")
        return checkpoint_data['results'], checkpoint_data['processed_count']
    
    return [], 0

# Load any existing checkpoint
RESULTS_LIST, PROCESSED_COUNT = load_existing_checkpoint()
print(f"Starting from: {PROCESSED_COUNT} logs already processed")


Starting from: 0 logs already processed


In [36]:
def create_strategic_1k_sample(unclassified_logs, target_size=1000, skip_processed=0):
    """Create strategic sample of 1k logs, skipping already processed"""
    
    strategic_sample = []
    
    # Priority 1: ERROR and WARNING logs (400 logs - 40%)
    error_logs = unclassified_logs[
        unclassified_logs['raw_log_text'].str.contains(
            'ERROR|WARNING|CRITICAL|TIMEOUT|FAILED', case=False, na=False
        )
    ]
    priority_1 = error_logs.sample(n=min(400, len(error_logs)), random_state=42)
    strategic_sample.append(priority_1)
    
    # Priority 2: Cluster-based sampling (600 logs - 60%)
    cluster_targets = {3: 200, 5: 150, 6: 150, 9: 50, 13: 50}
    
    used_indices = priority_1.index
    for cluster_id, target_count in cluster_targets.items():
        cluster_logs = unclassified_logs[
            (unclassified_logs['cluster_id'] == cluster_id) & 
            (~unclassified_logs.index.isin(used_indices))
        ]
        
        if len(cluster_logs) > 0:
            sample_size = min(target_count, len(cluster_logs))
            cluster_sample = cluster_logs.sample(n=sample_size, random_state=42)
            strategic_sample.append(cluster_sample)
            used_indices = used_indices.union(cluster_sample.index)
    
    # Combine and skip already processed
    final_sample = pd.concat(strategic_sample, ignore_index=True)
    
    if skip_processed > 0:
        final_sample = final_sample.iloc[skip_processed:].reset_index(drop=True)
        print(f"📋 Skipping {skip_processed} already processed logs")
    
    return final_sample

# Create sample, skipping already processed logs
strategic_1k_logs = create_strategic_1k_sample(unclassified_logs, target_size=1000, skip_processed=PROCESSED_COUNT)
print(f"Strategic sample created: {len(strategic_1k_logs)} logs to process")


Strategic sample created: 1000 logs to process


In [37]:

import os
from dotenv import load_dotenv

# Path to .env file in the parent directory
dotenv_path = os.path.abspath(os.path.join(os.getcwd(), '../.env'))
print(f"📍 Loading .env from: {dotenv_path}")

# Load .env file
load_dotenv(dotenv_path)



# Initialize LangChain Groq client with llama-3.1-8b-instant
try:
    llm = ChatGroq(
        groq_api_key=os.getenv('GROQ_API_KEY'),
        model_name='llama-3.1-8b-instant',  # 5x more tokens than 70b
        temperature=0.3,
        max_tokens=120  # Reduced for efficiency
    )
    
    # Test connection
    test_response = llm.invoke([HumanMessage(content="Test connection. Reply 'OK'.")])
    print("LLM connection successful")
    
except Exception as e:
    print(f"LLM initialization failed: {e}")
    print("Please check your GROQ_API_KEY in .env file")
    raise


python-dotenv could not parse statement starting at line 1


📍 Loading .env from: /Users/kxshrx/dev/log-classification/.env
LLM connection successful


In [38]:
# Pydantic model
class LogClassification(BaseModel):
    category: str = Field(..., description="Classification category")
    confidence: float = Field(..., ge=0.0, le=1.0, description="Confidence score")
    reasoning: str = Field(..., description="Brief explanation")

# Initialize LLM
llm = ChatGroq(
    groq_api_key=os.getenv('GROQ_API_KEY'),
    model_name='llama-3.1-8b-instant',
    temperature=0.3,
    max_tokens=120
)

# Optimized prompt
optimized_template = """Classify OpenStack log:

CATEGORIES:
SysOps, InstMgmt, NetOps, ResMgmt, SchedOps, BootErr, NetErr, FileErr, ConfigErr, ResErr, SvcErr

EXAMPLES:
- "WARNING _wait_for_boot timeout" → BootErr
- "INFO VIF plugged successfully" → NetOps  
- "ERROR file not found" → FileErr

LOG: {log_message}

JSON: {{"category": "X", "confidence": 0.8, "reasoning": "brief"}}"""

prompt = PromptTemplate(input_variables=["log_message"], template=optimized_template)

def classify_with_failure_detection(log_message, llm, prompt_template):
    """Classify with enhanced failure detection"""
    try:
        formatted_prompt = prompt_template.format(log_message=log_message[:400])
        messages = [HumanMessage(content=formatted_prompt)]
        
        response = llm.invoke(messages)
        response_text = response.content.strip()
        
        # Parse JSON
        import re
        json_match = re.search(r'\{.*\}', response_text, re.DOTALL)
        if json_match:
            json_str = json_match.group()
            result_data = json.loads(json_str)
        else:
            result_data = json.loads(response_text)
        
        # Map categories
        category_mapping = {
            'SysOps': 'System_Operations', 'InstMgmt': 'Instance_Management', 
            'NetOps': 'Network_Operations', 'ResMgmt': 'Resource_Management',
            'SchedOps': 'Scheduler_Operations', 'BootErr': 'Boot_Timeout_Errors',
            'NetErr': 'Network_Connection_Errors', 'FileErr': 'File_System_Errors',
            'ConfigErr': 'Configuration_Errors', 'ResErr': 'Resource_Allocation_Errors',
            'SvcErr': 'Service_Communication_Errors'
        }
        
        category = result_data.get('category', result_data.get('cat', 'Unknown'))
        if category in category_mapping:
            category = category_mapping[category]
        
        result = LogClassification(
            category=category,
            confidence=result_data.get('confidence', result_data.get('conf', 0.7)),
            reasoning=result_data.get('reasoning', 'Classified')
        )
        
        return result, False  # Success, no failure
        
    except Exception as e:
        error_str = str(e).lower()
        
        # Detect rate limit errors
        if any(term in error_str for term in ['rate limit', '429', 'too many requests', 'quota']):
            return LogClassification(
                category="Rate_Limit_Error",
                confidence=0.0,
                reasoning="Rate limit exceeded"
            ), True  # Failure detected
        
        # Other errors
        return LogClassification(
            category="Processing_Error",
            confidence=0.0,
            reasoning=f"Error: {str(e)[:50]}"
        ), True  # Failure detected

print("Enhanced classification function with failure detection created")


Enhanced classification function with failure detection created


In [39]:
def process_with_early_stopping_and_checkpoints(logs_list, llm, prompt_template, start_from=0):
    """Process logs with bulletproof checkpointing and early stopping"""
    
    global RESULTS_LIST, PROCESSED_COUNT, CONSECUTIVE_FAILURES
    
    total_logs = len(logs_list)
    print(f"🚀 Starting processing from log {start_from + 1}/{total_logs}")
    print(f"📊 Early stopping after {MAX_CONSECUTIVE_FAILURES} consecutive failures")
    
    for idx in range(start_from, total_logs):
        log_message = logs_list[idx]
        
        # Progress tracking
        if (idx + 1) % 10 == 0:
            success_rate = len([r for r in RESULTS_LIST if r.category not in ['Processing_Error', 'Rate_Limit_Error']]) / max(len(RESULTS_LIST), 1)
            print(f"📈 Processed {idx + 1}/{total_logs} ({(idx+1)/total_logs*100:.1f}%) - Success rate: {success_rate:.1%}")
        
        # Classify log
        result, is_failure = classify_with_failure_detection(log_message, llm, prompt_template)
        
        # Add result to global list
        RESULTS_LIST.append(result)
        PROCESSED_COUNT += 1
        
        # Handle failures
        if is_failure:
            CONSECUTIVE_FAILURES += 1
            print(f" Failure {CONSECUTIVE_FAILURES}/{MAX_CONSECUTIVE_FAILURES}: {result.category}")
            
            # Early stopping condition
            if CONSECUTIVE_FAILURES >= MAX_CONSECUTIVE_FAILURES:
                print(f"\nEARLY STOPPING: {CONSECUTIVE_FAILURES} consecutive failures detected")
                print(f" Processed {len(RESULTS_LIST)} logs before stopping")
                break
        else:
            CONSECUTIVE_FAILURES = 0  # Reset on success
        
        # Save checkpoint every 25 logs
        if (idx + 1) % 25 == 0:
            save_checkpoint_immediately(RESULTS_LIST, PROCESSED_COUNT)
        
        # Rate limiting delay
        time.sleep(2.5)
    
    # Final checkpoint save
    save_checkpoint_immediately(RESULTS_LIST, PROCESSED_COUNT)
    
    print(f"\n✅ Processing completed: {len(RESULTS_LIST)} total classifications")
    return RESULTS_LIST

# Execute processing
strategic_logs_list = strategic_1k_logs['raw_log_text'].tolist()
start_time = datetime.datetime.now()
print(f"⏰ Start time: {start_time.strftime('%H:%M:%S')}")

final_results = process_with_early_stopping_and_checkpoints(
    strategic_logs_list, 
    llm, 
    prompt,
    start_from=0
)

end_time = datetime.datetime.now()
print(f"⏰ End time: {end_time.strftime('%H:%M:%S')}")
print(f"⏱️ Duration: {end_time - start_time}")


⏰ Start time: 15:53:37
🚀 Starting processing from log 1/1000
📊 Early stopping after 5 consecutive failures
 Failure 1/5: Processing_Error
 Failure 1/5: Rate_Limit_Error
 Failure 2/5: Processing_Error
 Failure 3/5: Processing_Error
 Failure 4/5: Processing_Error
 Failure 5/5: Rate_Limit_Error

EARLY STOPPING: 5 consecutive failures detected
 Processed 7 logs before stopping
✅ Checkpoint saved: 7 logs processed

✅ Processing completed: 7 total classifications
⏰ End time: 15:57:06
⏱️ Duration: 0:03:29.546919


In [40]:
# Safe analysis with no division by zero
def safe_analyze_results(results_list):
    """Analyze results safely without division by zero errors"""
    
    if not results_list:
        print("⚠️ No results to analyze")
        return
    
    print(f"\n📊 FINAL RESULTS ANALYSIS")
    print(f"=" * 50)
    
    # Extract data safely
    categories = [r.category for r in results_list]
    confidences = [r.confidence for r in results_list if isinstance(r.confidence, (int, float))]
    
    # Category distribution
    category_dist = Counter(categories)
    print(f"📈 Total processed: {len(results_list)} logs")
    print(f"\n🏷️ Category Distribution:")
    for category, count in category_dist.most_common():
        percentage = count / len(results_list) * 100
        print(f"  {category}: {count} ({percentage:.1f}%)")
    
    # Confidence analysis (safe)
    if confidences:
        avg_confidence = sum(confidences) / len(confidences)
        high_conf_count = len([c for c in confidences if c >= 0.65])
        print(f"\n🎯 Confidence Analysis:")
        print(f"  Average confidence: {avg_confidence:.3f}")
        print(f"  High confidence (≥0.65): {high_conf_count}/{len(confidences)} ({high_conf_count/len(confidences)*100:.1f}%)")
    
    # Success rate analysis
    successful_classifications = len([r for r in results_list if r.category not in ['Processing_Error', 'Rate_Limit_Error']])
    success_rate = successful_classifications / len(results_list) * 100
    print(f"\n✅ Success Rate: {success_rate:.1f}% ({successful_classifications}/{len(results_list)})")
    
    # Error analysis
    errors = len([r for r in results_list if 'Error' in r.category])
    if errors > 0:
        print(f"❌ Errors encountered: {errors} ({errors/len(results_list)*100:.1f}%)")

# Analyze results safely
safe_analyze_results(RESULTS_LIST)

# Save final results
if RESULTS_LIST:
    final_results_data = []
    for i, result in enumerate(RESULTS_LIST):
        final_results_data.append({
            'log_index': i,
            'llm_category': result.category,
            'llm_confidence': result.confidence,
            'llm_reasoning': result.reasoning
        })
    
    final_df = pd.DataFrame(final_results_data)
    final_df.to_csv('llm_v2_final_results_1000.csv', index=False)
    
    # Save integration-ready file
    processed_logs = strategic_1k_logs.head(len(RESULTS_LIST)).copy()
    processed_logs['llm_category'] = [r.category for r in RESULTS_LIST]
    processed_logs['llm_confidence'] = [r.confidence for r in RESULTS_LIST]
    processed_logs['llm_reasoning'] = [r.reasoning for r in RESULTS_LIST]
    processed_logs.to_csv('strategic_1k_with_llm_results.csv', index=False)
    
    print(f"\n💾 FILES SAVED:")
    print(f"✅ llm_v2_final_results_1000.csv ({len(final_df)} classifications)")
    print(f"✅ strategic_1k_with_llm_results.csv (integration ready)")
    print(f"✅ Checkpoint files in llm_checkpoints/ directory")

print(f"\n🎉 LLM v2 PROCESSING COMPLETE")
print(f"📊 Successfully processed {len(RESULTS_LIST)} logs with bulletproof error handling")



📊 FINAL RESULTS ANALYSIS
📈 Total processed: 7 logs

🏷️ Category Distribution:
  Processing_Error: 4 (57.1%)
  Rate_Limit_Error: 2 (28.6%)
  Service_Communication_Errors: 1 (14.3%)

🎯 Confidence Analysis:
  Average confidence: 0.114
  High confidence (≥0.65): 1/7 (14.3%)

✅ Success Rate: 14.3% (1/7)
❌ Errors encountered: 7 (100.0%)

💾 FILES SAVED:
✅ llm_v2_final_results_1000.csv (7 classifications)
✅ strategic_1k_with_llm_results.csv (integration ready)
✅ Checkpoint files in llm_checkpoints/ directory

🎉 LLM v2 PROCESSING COMPLETE
📊 Successfully processed 7 logs with bulletproof error handling
