In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, max, min, sum, desc, from_unixtime
from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType

# Initialize Spark session
spark = SparkSession.builder.appName("API Logs Anomaly Detection").getOrCreate()

In [0]:
# Define schema for Apache Access logs
schema = StructType([
    StructField("ip", StringType(), True),
    StructField("timestamp", LongType(), True),
    StructField("method", StringType(), True),
    StructField("endpoint", StringType(), True),
    StructField("protocol", StringType(), True),
    StructField("response_code", IntegerType(), True),
    StructField("content_size", LongType(), True)
])

In [0]:
# Load data from parquet file
logs_df = spark.read.schema(schema).parquet("/FileStore/tables/apache_logs__1_.parquet")

# Convert timestamp to human-readable format
logs_df = logs_df.withColumn("timestamp", from_unixtime(col("timestamp")))

# Content Size Statistics
content_stats = logs_df.agg(
    min("content_size").alias("min_content_size"),
    max("content_size").alias("max_content_size"),
    count("content_size").alias("count_content_size"),
    sum("content_size").alias("total_content_size")
)

content_stats.show()

+----------------+----------------+------------------+------------------+
|min_content_size|max_content_size|count_content_size|total_content_size|
+----------------+----------------+------------------+------------------+
|             100|           10000|            100000|         504987157|
+----------------+----------------+------------------+------------------+



In [0]:
# Top Endpoints by Content Size
top_endpoints = logs_df.groupBy("endpoint").sum("content_size").alias("total_content_size").orderBy(desc("sum(content_size)"))
top_endpoints.show(10)

# Daily Content Size Statistics
daily_content_size = logs_df.groupBy(from_unixtime(col("timestamp"), "yyyy-MM-dd").alias("day")).sum("content_size").alias("daily_content_size").orderBy("day")
daily_content_size.show()

+--------+-----------------+
|endpoint|sum(content_size)|
+--------+-----------------+
|    null|        504987157|
+--------+-----------------+

+----+-----------------+
| day|sum(content_size)|
+----+-----------------+
|null|        504987157|
+----+-----------------+



In [0]:
# Daily Content Size Statistics
daily_content_size = logs_df.groupBy(from_unixtime(col("timestamp"), "yyyy-MM-dd").alias("day")).sum("content_size").alias("daily_content_size").orderBy("day")
daily_content_size.show()

# Top Visited Endpoints
#from pyspark.sql.functions import desc

# Top Visited Endpoints
top_visited_endpoints = logs_df.groupBy("endpoint").count().alias("visit_count").orderBy(desc("count"))
top_visited_endpoints.show(10)

# Frequent Visitors
frequent_visitors = logs_df.groupBy("ip").count().alias("visit_count").orderBy(desc("count"))
frequent_visitors.show(10)

+----+-----------------+
| day|sum(content_size)|
+----+-----------------+
|null|        504987157|
+----+-----------------+

+--------+------+
|endpoint| count|
+--------+------+
|    null|100000|
+--------+------+

+----+------+
|  ip| count|
+----+------+
|null|100000|
+----+------+



In [0]:
## Response Code Analysis
response_code_analysis = logs_df.groupBy("response_code").count().alias("count").orderBy(desc("count"))
response_code_analysis.show()

# IPs accessing the server more than 10 times
frequent_ip_accesses = logs_df.groupBy("ip").count().alias("visit_count").filter(col("count") > 10).orderBy(desc("count"))
frequent_ip_accesses.show()

+-------------+------+
|response_code| count|
+-------------+------+
|         null|100000|
+-------------+------+

+----+------+
|  ip| count|
+----+------+
|null|100000|
+----+------+



In [0]:
# Top 10 latest 404 requests
latest_404_requests = logs_df.filter(col("response_code") == 404).orderBy(desc("timestamp")).select("timestamp", "endpoint").limit(10)
latest_404_requests.show()

+---------+--------+
|timestamp|endpoint|
+---------+--------+
+---------+--------+



In [0]:
# Stop Spark session
spark.stop()