RAG-based AI Log Analysis with LangChain

Overview

This document outlines the implementation of a Retrieval-Augmented Generation (RAG)-based AI system for log analysis. The system utilizes LangChain, FAISS, and OpenAI to enable intelligent querying of log files, providing contextual and insightful responses to user queries.

Implementation Steps

## 1. Import Required Libraries

In [7]:
import os
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import faiss
from langchain.chains import VectorDBQA
from langchain.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings
from langchain.llms import OpenAI

## 2. Log Monitor Module

This module uses the watchdog library to monitor log file updates and trigger processing for new log entries.

In [8]:
class LogMonitor(FileSystemEventHandler):
    def __init__(self, log_file_path, on_update_callback):
        self.log_file_path = log_file_path
        self.on_update_callback = on_update_callback

    def on_modified(self, event):
        if event.src_path == self.log_file_path:
            with open(self.log_file_path, 'r') as f:
                lines = f.readlines()
            self.on_update_callback(lines)


## 3. Preprocessor Module

This module prepares raw log entries for vectorization by performing basic text cleaning and normalization.

In [9]:
def preprocess_logs(log_lines):
    processed_logs = []
    for line in log_lines:
        processed_logs.append(line.strip().lower())
    return processed_logs

## 4. Vectorizer and FAISS Integration with LangChain

The integration of FAISS and LangChain enables efficient storage and retrieval of log entries based on semantic similarity.

In [10]:
class LangChainFAISSHandler:
    def __init__(self):
        self.embedding = OpenAIEmbeddings()
        self.vector_store = FAISS(embedding_function=self.embedding)
        self.system_context = "You are a helpful and intelligent log analyzer. Answer user queries based on the provided log data."

    def add_logs(self, logs):
        for log in logs:
            self.vector_store.add_texts([log])

    def query_logs(self, query):
        qa_chain = VectorDBQA(
            llm=OpenAI(), 
            vectorstore=self.vector_store
        )
        full_prompt = [
            {"role": "system", "content": self.system_context},
            {"role": "user", "content": query}
        ]
        response = qa_chain.run(full_prompt)
        return response


## 5. Integration Workflow

This workflow ties together log monitoring, preprocessing, vectorization, and querying into a seamless pipeline.

In [11]:
def process_logs_and_query(log_file_path, query):
    # Step 1: Monitor Logs
    def on_log_update(new_logs):
        processed_logs = preprocess_logs(new_logs)
        faiss_handler.add_logs(processed_logs)

    log_monitor = LogMonitor(log_file_path, on_log_update)
    observer = Observer()
    observer.schedule(log_monitor, path=os.path.dirname(log_file_path), recursive=False)
    observer.start()

    # Step 2: Query Logs
    return faiss_handler.query_logs(query)

In [None]:
# Example usage
log_file_path = '/path/to/logfile.log'
query = "Why are there repeated login failures?"
result = process_logs_and_query(log_file_path, query)
print("Query Result:", result)