In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_extract, col, to_timestamp, window
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
import pandas as pd
import matplotlib.pyplot as plt

In [None]:
# Creating Spark Session
spark = SparkSession.builder \
    .master("local[1]") \
    .appName("Nginx Log Analysis") \
    .getOrCreate()

Nginx logs example
127.0.0.1 - - [10/Jul/2024:22:14:15 +0000] "GET /index.html HTTP/1.1" 200 612 "-" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"

In [None]:
# Path to log file
log_file = "data/access.log"

# Read file
logs_df = spark.read.text(log_file)

# Regex to parse logs
log_pattern = r'(\S+) (\S+) (\S+) \[(\S+ +\S+)\] "(\S+) (\S+)\s*(\S*) (\S*)" (\d{3}) (\d+) "(.*?)" "(.*?)"'

# determine needed values from regex
logs_df = logs_df.select(
    regexp_extract('value', log_pattern, 1).alias('ip'),
    regexp_extract('value', log_pattern, 4).alias('timestamp'),
    regexp_extract('value', log_pattern, 5).alias('method'),
    regexp_extract('value', log_pattern, 6).alias('endpoint'),
    regexp_extract('value', log_pattern, 9).alias('status'),
    regexp_extract('value', log_pattern, 10).alias('content_size')
)

logs_df = logs_df.withColumn("content_size", col("content_size").cast("integer"))
logs_df.show(10, truncate=False)

Let's add the necessary signs to detect anomalies:

The number of requests from the same IP address in different time windows.
Average response size for each IP address in different time windows.

In [None]:
# change string datetime to timestamp
logs_df = logs_df.withColumn('timestamp', to_timestamp(logs_df.timestamp, 'dd/MMM/yyyy:HH:mm:ss Z'))

# 5 min window for data agregation
windowed_logs_df = logs_df.groupBy(window("timestamp", "5 minutes"), "ip") \
    .agg(
        count("ip").alias("request_count"),
        avg("content_size").alias("avg_content_size")
    )

windowed_logs_df.show(10, truncate=False)

In [None]:
# Using VectorAssembler to prepare data
assembler = VectorAssembler(
    inputCols=["request_count", "avg_content_size"],
    outputCol="features"
)

feature_vector = assembler.transform(windowed_logs_df)
feature_vector.show(10, truncate=False)

Anomaly Detection Using KMeans
Let's use the KMeans algorithm to detect anomalies

In [None]:
# creating KMeans model
kmeans = KMeans(k=2, seed=1)  # Количество кластеров k можно настроить
model = kmeans.fit(feature_vector.select("features"))

predictions = model.transform(feature_vector)

evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print(f"Silhouette with squared euclidean distance = {silhouette}")

# Detecting anomlies
predictions.groupBy("prediction").count().show()

In [None]:
# export data to Pandas DataFrame
predictions_pd = predictions.toPandas()

plt.figure(figsize=(12, 8))
plt.scatter(predictions_pd["request_count"], predictions_pd["avg_content_size"], c=predictions_pd["prediction"], cmap="viridis")
plt.xlabel("Request Count")
plt.ylabel("Average Content Size")
plt.title("Nginx Log Clustering")
plt.colorbar(label="Cluster")
plt.show()