In [1]:
def read_log_file(file_path):
    """Read the log file and return its content."""
    with open(file_path, 'r') as file:
        content = file.readlines()
    return content

def chunk_text(lines, max_length=512):
    """Split text into chunks."""
    chunks = []
    chunk = ""
    for line in lines:
        if len(chunk) + len(line) > max_length:
            chunks.append(chunk)
            chunk = line
        else:
            chunk += line
    if chunk:
        chunks.append(chunk)
    return chunks


In [None]:

# Example usage
file_path = 'data/log1.out'
log_lines = read_log_file(file_path)

# Assuming a transformer model with a token limit of 512,
# we need to chunk the log data
log_chunks = chunk_text(log_lines, max_length=512)

# Now, log_chunks contains the log data split into smaller chunks

In [None]:
import numpy as np
from scipy.spatial.distance import cosine
import torch

def is_text_redundant(text):
    """
    Check if a text is redundant.

    A simple approach to check redundancy is to look for repeated phrases or sentences.
    This function uses a basic method where it checks for repeated sequences of words.
    
    param text: The text to be checked for redundancy.
    return: True if the text is redundant, False otherwise.
    """
    words = text.split()
    seen = set()
    for i in range(len(words) - 5):  # Check sequences of 5 words
        sequence = ' '.join(words[i:i+5])
        if sequence in seen:
            return True
        seen.add(sequence)
    return False

def determine_overlap(text, max_length=512):
    """
    Determine the best overlapping parameter for splitting a text.

    The best overlap is determined based on the length and redundancy of the text.
    Shorter and less redundant texts can have smaller overlaps, while longer and more redundant
    texts may require larger overlaps to ensure continuity.

    param text: The text for which to determine the best overlap.
    param max_length: The maximum length of each chunk in characters.
    return: The recommended overlap size in characters.
    """
    length = len(text)

    # Set base overlap sizes
    short_text_overlap = 20  # For short, non-redundant texts
    long_text_overlap = 50  # For long or redundant texts

    if length < max_length:
        return short_text_overlap
    else:
        if is_text_redundant(text):
            # If the text is redundant, increase the overlap to handle complexity
            return long_text_overlap + 30  # Increasing overlap for redundancy
        else:
            # For longer texts which are not redundant
            return long_text_overlap

def split_context(context, max_length=512):
    """
    Function for splitting context into overlapping chunks.
    
    param context: This is the text that you want to split into chunks. 
    The function will split this text based on the max_length and overlap parameters.

    param overlap (default=50): This is the number of characters that will overlap between each chunk. 
    This is used to ensure that the context is not cut off in the middle of a sentence, which could make the text difficult to understand.

    param max_length (default=512): This is the maximum length of each chunk. 
    The function will split the context into chunks of this length, with the exception of the last chunk, which may be shorter.

    The function returns a list of chunks, where each chunk is a string of text from the context. 
    The chunks are created by starting at the beginning of the context and moving forward max_length
    characters at a time, with an overlap of overlap characters between each chunk.
    """
    overlap = determine_overlap(context, max_length)
    
    chunks = []
    start = 0
    while start < len(context):
        end = min(start + max_length, len(context))
        chunks.append(context[start:end])
        if end == len(context):
            break
        start = end - overlap
    return chunks


def answer_question(model, tokenizer, context, question):
    """
    The function answers questions given context and question.
    
    model: This is the model that you're using to generate answers to the questions. 
    It could be any model that's capable of question answering, such as a transformer model.

    param  tokenizer: This is the tokenizer that corresponds to your model. 
    It's used to convert your text data into a format that the model can understand.

    param context: This is the text that the model will look at to find an answer to the question.

    param question: This is the question that you're asking the model. 
    The model will generate an answer to this question based on the context.

    The function returns an answer to the question based on the context. 
    The answer is generated by finding the tokens with the highest start and end scores, 
    and joining them together. If the end score is higher than the start score, 
    they are swapped to ensure the answer makes sense.
    """
    # Encode the context and question
    encoded = tokenizer.encode_plus(question, context, truncation=True, padding='max_length', max_length=512, return_tensors='pt')

    # Get the start and end scores for all tokens
    result = model(**encoded)
    start_scores = result["start_logits"]
    end_scores = result["end_logits"]

    # Find the tokens with the highest start and end scores
    answer_start = torch.argmax(start_scores)
    answer_end = torch.argmax(end_scores)

    # If the end score is higher than the start score, swap them
    if answer_end < answer_start:
        answer_start, answer_end = answer_end, answer_start

    # Get the tokens for the answer
    all_tokens = tokenizer.convert_ids_to_tokens(encoded['input_ids'][0])
    answer = ' '.join(all_tokens[answer_start : answer_end+1])

    return answer


def vectorize_text(model, tokenizer, input_string):
    """
    Vectorize a given input string.
    
    param model: This is the model used to encode the input string and get the output. 
    It could be any model that's capable of encoding text, such as a transformer model.

    param tokenizer: This is the tokenizer that corresponds to your model. 
    It's used to convert your text data into a format that the model can understand.

    param input_string: This is the text that you want to vectorize. 
    The function will convert this text into a numerical representation that 
    can be processed by the machine learning model.

    The function returns a vector representation of the input string. 
    This vector is obtained by averaging the embeddings from the last hidden 
    state of the model's output.
    """
    # Encode the input string
    inputs = tokenizer.encode_plus(
        input_string,
        add_special_tokens=True,
        return_tensors="pt"
    )

    # Get the output from the model
    outputs = model(**inputs)

    # Get the embeddings from the last hidden state
    embeddings = outputs.last_hidden_state

    # Average the embeddings
    vector = torch.mean(embeddings, dim=1)

    # Convert tensor to numpy array
    vector = vector.detach().numpy()

    return vector


def calculate_similarity(question_vector, answer_vector):
    """Calculate the cosine similarity between the question and answer vectors.
    
    param question_vector: This is the vector representation of the question. 
    It's obtained by transforming the question text into numerical data that 
    can be processed by the machine learning model.

    param answer_vector: This is the vector representation of the answer. 
    It's obtained by transforming the answer text into numerical data that 
    can be processed by the machine learning model.

    The function calculates and returns the cosine similarity between the 
    question and answer vectors. Cosine similarity is a measure of similarity 
    between two non-zero vectors of an inner product space that measures the 
    cosine of the angle between them. The closer the cosine similarity to 1, 
    the more similar the question and answer are.
    """

    similarity = 1 - cosine(question_vector[0], answer_vector[0])

    return similarity


def find_best_answer(model, tokenizer, context, question, model_vec, num_answers=3, overlap=50, max_length=512):
    """Find the best answers to the question given a long context
    param model: This is the model that you're using to generate answers to the questions. 
    It could be any model that's capable of question answering, such as a transformer model.

    param tokenizer: This is the tokenizer that corresponds to your model.
    It's used to convert your text data into a format that the model can understand.

    param context: This is the text that the model will look at to find an answer to the question. 
    In this case, it's a long text that's split into chunks.

    param question: This is the question that you're asking the model. 
    The model will generate an answer to this question based on the context.

    param model_vec: This is a model used to vectorize the text, 
    i.e., convert the text into numerical data that can be processed by the machine learning model.

    param num_answers (default=3): This is the number of best answers the function will return.

    param overlap (default=50): This is the number of overlapping words between 
    two consecutive chunks when the context is split into chunks.

    param max_length (default=512): This is the maximum length of each chunk. 
    The context is split into chunks of this length.

    The function returns a list of tuples, where each tuple contains an answer 
    and its similarity score. The list is sorted in ascending order of similarity, 
    so the first element of the list is the answer with the lowest similarity, 
    and the last element is the answer with the highest similarity.
        
    """
    # Vectorize the question
    question_vector = vectorize_text(model_vec, tokenizer, question)
    
    # Initialize the best answers and their similarities to the question
    best_answers = [(None, -1) for _ in range(num_answers)]
    
    # Split the context into chunks
    chunks = split_context(context,max_length)
    
    # Generate an answer for each chunk and update the best answers if necessary
    for chunk in chunks:
        answer = answer_question(model, tokenizer, chunk, question)
        if answer is not None:
            answer_vector = vectorize_text(model_vec, tokenizer, answer)
            if answer_vector is not None:
                similarity = calculate_similarity(question_vector, answer_vector)
                # Check if the similarity is higher than the current lowest in best_answers
                if similarity > best_answers[0][1]:
                    # Replace the lowest
                    best_answers[0] = (answer, similarity)
                    # Sort the list so the lowest similarity is first
                    best_answers = sorted(best_answers, key=lambda x: x[1])
    # Return the answers along with their similarities
    return best_answers