In [2]:
import findspark
findspark.init()

import os
import sys
import json
from pathlib import Path
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')

In [3]:
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import *
from pyspark.ml import Pipeline
from pyspark.ml.feature import (
    Tokenizer, StopWordsRemover, CountVectorizer, IDF, 
    StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler,
    MinMaxScaler, RegexTokenizer
)
from pyspark.ml.classification import (
    NaiveBayes, LinearSVC, LogisticRegression
)
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

Feature Engineering

In [4]:
PROJECT_ROOT = Path(r"C:\Computer Science\AIMLDL\log-anomaly-detection")
DATASET_PATH = PROJECT_ROOT / "dataset"
PROCESSED_DATA_PATH = DATASET_PATH / "processed"
RESULTS_PATH = PROJECT_ROOT / "results"

print(f"Project Root: {PROJECT_ROOT}")
print(f"Processed Data: {PROCESSED_DATA_PATH}")

os.environ['HADOOP_HOME'] = 'C:\\hadoop'
os.environ['PATH'] = f"{os.environ['HADOOP_HOME']}\\bin;{os.environ['PATH']}"

Project Root: C:\Computer Science\AIMLDL\log-anomaly-detection
Processed Data: C:\Computer Science\AIMLDL\log-anomaly-detection\dataset\processed


Spark Session

In [5]:
spark = SparkSession.builder \
    .master("local[*]") \
    .config("spark.driver.memory", "18g") \
    .config("spark.driver.maxResultSize", "4g") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.sql.shuffle.partitions", "8") \
    .config("spark.sql.adaptive.coalescePartitions.maxRecords", "1000000") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.default.parallelism", "16") \
    .config("spark.sql.repl.eagerEval.enabled", "true") \
    .config("spark.sql.repl.eagerEval.maxNumRows", "20") \
    .appName("LogAnomalyFeatureEngineering") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

print(f"Enhanced Spark session initialized")
print(f"Driver memory: {spark.conf.get('spark.driver.memory')}")
print(f"Shuffle partitions: {spark.conf.get('spark.sql.shuffle.partitions')}")

Enhanced Spark session initialized
Driver memory: 18g
Shuffle partitions: 8


In [6]:
def load_eda_metadata():
    metadata_file = RESULTS_PATH / "eda_metadata.json"
    if metadata_file.exists():
        with open(metadata_file, 'r') as f:
            return json.load(f)
    else:
        print("EDA metadata not found.")
        return None

def load_processed_datasets():
    processed_files = list(PROCESSED_DATA_PATH.glob("*_processed.parquet"))
    print(f"Found {len(processed_files)} processed datasets")
    
    datasets = {}
    for file_path in processed_files:
        dataset_name = file_path.stem.replace("_processed", "")
        try:
            df = spark.read.parquet(str(file_path))
            datasets[dataset_name] = df
            print(f"Loaded {dataset_name}: {df.count():,} rows × {len(df.columns)} columns")
        except Exception as e:
            print(f"Error loading {dataset_name}: {e}")
    
    return datasets

print("\nLoading EDA results and processed datasets...")
eda_metadata = load_eda_metadata()
datasets = load_processed_datasets()


Loading EDA results and processed datasets...
Found 16 processed datasets
Loaded Android_2k.log_structured: 2,000 rows × 10 columns
Loaded Apache_2k.log_structured: 2,000 rows × 6 columns
Loaded BGL_2k.log_structured: 2,000 rows × 13 columns
Loaded Hadoop_2k.log_structured: 2,000 rows × 9 columns
Loaded HDFS_2k.log_structured: 2,000 rows × 9 columns
Loaded HealthApp_2k.log_structured: 2,000 rows × 7 columns
Loaded HPC_2k.log_structured: 2,000 rows × 10 columns
Loaded Linux_2k.log_structured: 2,000 rows × 10 columns
Loaded Mac_2k.log_structured: 2,000 rows × 11 columns
Loaded OpenSSH_2k.log_structured: 2,000 rows × 9 columns
Loaded OpenStack_2k.log_structured: 2,000 rows × 11 columns
Loaded Proxifier_2k.log_structured: 2,000 rows × 6 columns
Loaded Spark_2k.log_structured: 2,000 rows × 8 columns
Loaded Thunderbird_2k.log_structured: 2,000 rows × 14 columns
Loaded Windows_2k.log_structured: 2,000 rows × 8 columns
Loaded Zookeeper_2k.log_structured: 2,000 rows × 10 columns


In [1]:
def create_unified_schema():
    universal_columns = ['LineId', 'Time', 'Content', 'EventId', 'EventTemplate']
    
    unified_schema = StructType([
        StructField("source_system", StringType(), True),      # Log source identifier
        StructField("line_id", IntegerType(), True),           # Original LineId
        StructField("timestamp", TimestampType(), True),        # Standardized timestamp
        StructField("content", StringType(), False),           # Log content (primary feature)
        StructField("event_id", StringType(), True),           # Event identifier
        StructField("event_template", StringType(), True),     # Event template
        StructField("level", StringType(), True),              # Log level (when available)
        StructField("component", StringType(), True),          # Component (when available)
        StructField("content_length", IntegerType(), True),    # Content character count
        StructField("word_count", IntegerType(), True)         # Word count
    ])
    
    return unified_schema

def harmonize_dataset(df, source_name):

    print(f"Harmonizing {source_name}...")
    
    harmonized = df.withColumn("source_system", F.lit(source_name))
    
    column_mapping = {
        'LineId': 'line_id',
        'Time': 'timestamp',
        'Content': 'content',
        'EventId': 'event_id',
        'EventTemplate': 'event_template',
        'Level': 'level',
        'Component': 'component'
    }
    
    for old_col, new_col in column_mapping.items():
        if old_col in harmonized.columns:
            harmonized = harmonized.withColumnRenamed(old_col, new_col)
    
    for col_name, col_type in [('level', StringType()), ('component', StringType())]:
        if col_name not in harmonized.columns:
            harmonized = harmonized.withColumn(col_name, F.lit(None).cast(col_type))
    
    harmonized = harmonized \
        .withColumn("content_length", F.length("content")) \
        .withColumn("word_count", F.size(F.split(F.col("content"), " ")))
    
    unified_columns = ['source_system', 'line_id', 'timestamp', 'content', 
                      'event_id', 'event_template', 'level', 'component', 
                      'content_length', 'word_count']
    
    available_columns = [col for col in unified_columns if col in harmonized.columns]
    result = harmonized.select(*available_columns)
    
    return result

print(f"\nCreating unified dataset from {len(datasets)} sources...")
unified_dfs = []

for dataset_name, df in datasets.items():
    harmonized_df = harmonize_dataset(df, dataset_name)
    unified_dfs.append(harmonized_df)

unified_df = unified_dfs[0]
for df in unified_dfs[1:]:
    unified_df = unified_df.union(df)

print(f"Unified dataset created: {unified_df.count():,} total rows")
print(f"Source distribution:")
unified_df.groupBy("source_system").count().orderBy("count", ascending=False).show()

NameError: name 'datasets' is not defined