In [8]:
# pip install pyspark

Note: you may need to restart the kernel to use updated packages.


In [1]:
import os
import pandas as pd
import re
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf

In [2]:
# Function to load the CSV file
def load_csv(file_path):
    df = pd.read_csv(file_path)
    return df

In [3]:
# Function to preprocess full log messages by replacing variable elements with placeholders
def tokenize_template(log):
    log = re.sub(r'\d+', '<NUM>', log)  # Replace numbers
    log = re.sub(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b', '<IP>', log)  # Replace IPs
    log = re.sub(r'\[[^\]]*\]', '<PARAM>', log)  # Replace content inside brackets
    return log

In [4]:
# Function to apply MapReduce using PySpark
def mapreduce_log_template(file_path):
    # Initialize Spark Session
    conf = SparkConf().setAppName("LogTemplateExtraction").setMaster("local")
    sc = SparkContext(conf=conf)
    spark = SparkSession(sc)
    
    # Read CSV file as Spark DataFrame
    df = spark.read.csv(file_path, header=True)
    
    # Function to map each log entry to (thread_id, full log message)
    def map_function(row):
        full_log = f"{row['timestamp']} {row['log_level']} {row['thread_id']} {row['group_id']} {row['module']} {row['message']}"
        return (row['thread_id'], tokenize_template(full_log))
    
    # Convert DataFrame to RDD and apply the map function
    rdd = df.rdd.map(map_function)
    
    # Group logs by thread_id
    grouped_rdd = rdd.groupByKey().mapValues(list)
    
    # Reduce function to extract unique templates per thread_id
    def reduce_function(thread_id, logs):
        unique_templates = list(set(logs))  # Remove duplicates
        return (thread_id, unique_templates)
    
    result_rdd = grouped_rdd.map(lambda x: reduce_function(x[0], x[1]))
    result_dict = dict(result_rdd.collect())
    
    return result_dict

In [5]:
# Function to save logs and templates in structured folders
def save_results(df, templates):
    for thread_id, template_list in templates.items():
        folder_name = f"thread_{thread_id}"
        os.makedirs(folder_name, exist_ok=True)  # Create folder for thread_id
        
        # Save full log messages
        full_logs = df[df['thread_id'] == thread_id]
        full_logs_path = os.path.join(folder_name, "full_logs.txt")
        full_logs.to_csv(full_logs_path, index=False, header=False, sep=' ')
        
        # Save templates (each on a new line)
        template_path = os.path.join(folder_name, "templates.txt")
        with open(template_path, "w") as f:
            f.write("\n".join(template_list))
        
        # Save summary (templates in one line)
        summary_path = os.path.join(folder_name, "summary.txt")
        with open(summary_path, "w") as f:
            f.write(" ".join(template_list))

In [6]:
# Main execution function
def main(file_path):
    df = load_csv(file_path)
    
    # Run MapReduce for log template extraction
    templates = mapreduce_log_template(file_path)
    
    # Print extracted templates for each thread_id
    print("\nExtracted Templates:")
    for tid, template_list in templates.items():
        print(f"Thread ID: {tid}\nTemplates:")
        for template in template_list:
            print(template)
    
    # Save results in structured folders
    save_results(df, templates)

In [7]:
# Run the script with CSV file
file_path = "logs.csv" 
main(file_path)

25/02/18 21:34:27 WARN Utils: Your hostname, trupti-VivoBook-ASUSLaptop-X421EAYB-K413EA resolves to a loopback address: 127.0.1.1; using 172.16.144.156 instead (on interface wlo1)
25/02/18 21:34:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/18 21:34:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/02/18 21:34:28 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/02/18 21:34:28 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
[Stage 1:>                                                          (0 + 1) / 1]


Extracted Templates:
Thread ID: 24
Templates:
<NUM>-<NUM>-<NUM> <NUM>:<NUM>:<NUM>,<NUM> INFO <NUM> <NUM> api/v<NUM>/app/instance/list Request START -- <PARAM><PARAM><PARAM><PARAM>
<NUM>-<NUM>-<NUM> <NUM>:<NUM>:<NUM>,<NUM> DEBUG <NUM> <NUM> api/v<NUM>/displaystrings/list Checking user authz
<NUM>-<NUM>-<NUM> <NUM>:<NUM>:<NUM>,<NUM> ERROR <NUM> <NUM> api/v<NUM>/alert/list Session is invalid!
<NUM>-<NUM>-<NUM> <NUM>:<NUM>:<NUM>,<NUM> DEBUG <NUM> <NUM> api/v<NUM>/app/instance/list reading input - payload, query params and path vars
<NUM>-<NUM>-<NUM> <NUM>:<NUM>:<NUM>,<NUM> DEBUG <NUM> <NUM> api/v<NUM>/user/authn performing input validation
<NUM>-<NUM>-<NUM> <NUM>:<NUM>:<NUM>,<NUM> INFO <NUM> <NUM> api/v<NUM>/app/instance/list Request END -- <PARAM>
<NUM>-<NUM>-<NUM> <NUM>:<NUM>:<NUM>,<NUM> INFO <NUM> <NUM> runtime Request END -- <PARAM>
<NUM>-<NUM>-<NUM> <NUM>:<NUM>:<NUM>,<NUM> INFO <NUM> <NUM> api/v<NUM>/user/authn Request START -- <PARAM><PARAM><PARAM><PARAM>
<NUM>-<NUM>-<NUM> <NUM>:<NU

                                                                                