In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=4bdddd4799e2c7fcc5c45f03fb4580010c5905f44de0d822c0524295e7708f04
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, current_timestamp, date_format, rand, expr
from pyspark.sql.functions import to_timestamp, dayofyear, date_format, sum, min, max, count, desc
from pyspark.sql.functions import to_date, col

# Initializing Spark session
spark = SparkSession.builder \
    .appName("Generate Semi-Structured Logs") \
    .getOrCreate()

# Generating semi-structured log data
num_logs = 100000

logs_df = spark.range(0, num_logs) \
    .withColumn('timestamp', current_timestamp()) \
    .withColumn('remote_host', expr("concat_ws('.', floor(rand() * 256), floor(rand() * 256), floor(rand() * 256), floor(rand() * 256))")) \
    .withColumn('request_method', expr("CASE WHEN rand() < 0.25 THEN 'GET' WHEN rand() < 0.5 THEN 'POST' WHEN rand() < 0.75 THEN 'PUT' ELSE 'DELETE' END")) \
    .withColumn('request_endpoint', expr("concat('/api/', substring('abcdefghijklmnopqrstuvwxyz0123456789', floor(rand() * 25) + 1, 10))")) \
    .withColumn('protocol', expr("CASE WHEN rand() < 0.5 THEN 'HTTP/1.1' ELSE 'HTTP/2.0' END")) \
    .withColumn('status_code', expr("CASE WHEN rand() < 0.8 THEN 200 WHEN rand() < 0.9 THEN 404 ELSE 500 END")) \
    .withColumn('content_size', expr("floor(rand() * 10000)"))

# Saving the DataFrame as a Parquet file
logs_df.write.mode('overwrite').parquet('semi_structured_logs.parquet')


In [3]:
# Loading semi-structured logs data
logs_df = spark.read.parquet("semi_structured_logs.parquet")

# Displaying the DataFrame
logs_df.show(10, truncate=False)

+-----+-------------------------+--------------+--------------+----------------+--------+-----------+------------+
|id   |timestamp                |remote_host   |request_method|request_endpoint|protocol|status_code|content_size|
+-----+-------------------------+--------------+--------------+----------------+--------+-----------+------------+
|50000|2024-05-24 00:02:07.56136|232.45.190.173|GET           |/api/qrstuvwxyz |HTTP/2.0|200        |2140        |
|50001|2024-05-24 00:02:07.56136|225.254.69.125|POST          |/api/klmnopqrst |HTTP/1.1|404        |3537        |
|50002|2024-05-24 00:02:07.56136|245.115.63.26 |PUT           |/api/mnopqrstuv |HTTP/2.0|200        |3409        |
|50003|2024-05-24 00:02:07.56136|63.141.172.208|PUT           |/api/abcdefghij |HTTP/2.0|200        |9674        |
|50004|2024-05-24 00:02:07.56136|57.98.198.51  |PUT           |/api/opqrstuvwx |HTTP/1.1|404        |1553        |
|50005|2024-05-24 00:02:07.56136|121.8.112.203 |PUT           |/api/ghijklmnop |

In [4]:
# Converting and extracting timestamp information
logs_df = logs_df.withColumn('timestamp', to_timestamp('timestamp'))
logs_df = logs_df.withColumn('day', dayofyear('timestamp'))
logs_df = logs_df.withColumn('date', date_format('timestamp', 'yyyy-MM-dd'))
logs_df.select('timestamp', 'day', 'date').show(10, truncate=False)

+-------------------------+---+----------+
|timestamp                |day|date      |
+-------------------------+---+----------+
|2024-05-24 00:02:07.56136|145|2024-05-24|
|2024-05-24 00:02:07.56136|145|2024-05-24|
|2024-05-24 00:02:07.56136|145|2024-05-24|
|2024-05-24 00:02:07.56136|145|2024-05-24|
|2024-05-24 00:02:07.56136|145|2024-05-24|
|2024-05-24 00:02:07.56136|145|2024-05-24|
|2024-05-24 00:02:07.56136|145|2024-05-24|
|2024-05-24 00:02:07.56136|145|2024-05-24|
|2024-05-24 00:02:07.56136|145|2024-05-24|
|2024-05-24 00:02:07.56136|145|2024-05-24|
+-------------------------+---+----------+
only showing top 10 rows



In [5]:
# Calculating statistics related to content size
# Top endpoints content
top_endpoints = logs_df.groupBy("request_endpoint").count().orderBy("count", ascending=False)
top_endpoints.show(10)

+----------------+-----+
|request_endpoint|count|
+----------------+-----+
| /api/vwxyz01234| 4099|
| /api/hijklmnopq| 4090|
| /api/bcdefghijk| 4084|
| /api/abcdefghij| 4081|
| /api/uvwxyz0123| 4079|
| /api/tuvwxyz012| 4073|
| /api/xyz0123456| 4061|
| /api/pqrstuvwxy| 4038|
| /api/fghijklmno| 4025|
| /api/mnopqrstuv| 4017|
+----------------+-----+
only showing top 10 rows



In [6]:
# Top endpoints transferring maximum content
top_endpoints_by_content = logs_df.groupBy("request_endpoint").sum("content_size") \
    .orderBy(desc("sum(content_size)")).withColumnRenamed("sum(content_size)", "total_content_size")
top_endpoints_by_content.show(10)

+----------------+------------------+
|request_endpoint|total_content_size|
+----------------+------------------+
| /api/tuvwxyz012|          20643060|
| /api/vwxyz01234|          20612552|
| /api/pqrstuvwxy|          20578403|
| /api/abcdefghij|          20547548|
| /api/bcdefghijk|          20412405|
| /api/nopqrstuvw|          20346325|
| /api/xyz0123456|          20318454|
| /api/hijklmnopq|          20302675|
| /api/efghijklmn|          20282253|
| /api/uvwxyz0123|          20126540|
+----------------+------------------+
only showing top 10 rows



In [7]:
# Daily visited content size
daily_content_size = logs_df.withColumn("date", to_date(col("timestamp"))).groupBy("date").agg(sum("content_size").alias("daily_content_size")).orderBy("date")
daily_content_size.show()

+----------+------------------+
|      date|daily_content_size|
+----------+------------------+
|2024-05-24|         498941818|
+----------+------------------+



In [8]:
# Min, Max and Count of content size
content_size_stats = logs_df.agg(min('content_size').alias('min_size'), max('content_size').alias('max_size'), count('content_size').alias('count_size'))
content_size_stats.show()

+--------+--------+----------+
|min_size|max_size|count_size|
+--------+--------+----------+
|       0|    9999|    100000|
+--------+--------+----------+



In [9]:
# Response Code Analysis
response_code_analysis = logs_df.groupBy('status_code').count().orderBy('count', ascending=False)
response_code_analysis.show()

+-----------+-----+
|status_code|count|
+-----------+-----+
|        200|80195|
|        404|17860|
|        500| 1945|
+-----------+-----+



In [10]:
# Frequent Visitors
frequent_visitors = logs_df.groupBy("remote_host").count().filter(col("count") >= 1).orderBy("count", ascending=False)
frequent_visitors.show(10)

+---------------+-----+
|    remote_host|count|
+---------------+-----+
|236.144.138.236|    1|
| 135.97.191.247|    1|
|142.173.173.225|    1|
| 235.34.254.148|    1|
|161.201.221.123|    1|
| 233.63.198.119|    1|
|  56.236.45.247|    1|
|  27.62.138.219|    1|
|223.255.230.118|    1|
|   201.148.1.65|    1|
+---------------+-----+
only showing top 10 rows



In [11]:
# Identifying IP addresses accessing the server more than 10 times
frequent_ip_addresses = logs_df.groupBy('remote_host').agg(count('*').alias('access_count')).filter('access_count > 10').orderBy(desc('access_count'))
frequent_ip_addresses.show()

+-----------+------------+
|remote_host|access_count|
+-----------+------------+
+-----------+------------+



In [12]:
# Analyzing bad requests including the top 10 latest 404 requests with their endpoints and time
latest_404_requests = logs_df.filter(col("status_code") == 404).orderBy(col("timestamp"),\
                       ascending=False).select("timestamp", "request_endpoint").limit(10)
latest_404_requests.show(truncate=False)

+-------------------------+----------------+
|timestamp                |request_endpoint|
+-------------------------+----------------+
|2024-05-24 00:02:07.56136|/api/cdefghijkl |
|2024-05-24 00:02:07.56136|/api/klmnopqrst |
|2024-05-24 00:02:07.56136|/api/cdefghijkl |
|2024-05-24 00:02:07.56136|/api/opqrstuvwx |
|2024-05-24 00:02:07.56136|/api/fghijklmno |
|2024-05-24 00:02:07.56136|/api/defghijklm |
|2024-05-24 00:02:07.56136|/api/hijklmnopq |
|2024-05-24 00:02:07.56136|/api/cdefghijkl |
|2024-05-24 00:02:07.56136|/api/hijklmnopq |
|2024-05-24 00:02:07.56136|/api/rstuvwxyz0 |
+-------------------------+----------------+



In [13]:
# Calculating percentage of successful requests
total_requests = logs_df.count()
successful_requests = logs_df.filter(col("status_code") == 200).count()
error_requests = total_requests - successful_requests

# Calculating success and error percentages
success_percentage = (successful_requests / total_requests) * 100
error_percentage = 100 - success_percentage

# Defining thresholds for anomaly detection
success_threshold = 95  # Threshold for successful requests percentage
error_threshold = 5  # Threshold for error requests percentage

# Transformation for anomaly detection
def detect_anomalies(success_percentage, error_percentage):
    if success_percentage < success_threshold:
        return "Anomaly: Low success rate detected"
    elif error_percentage > error_threshold:
        return "Anomaly: High error rate detected"
    else:
        return "No anomalies detected"

# Applying transformation to detect anomalies
anomaly_detection_result = detect_anomalies(success_percentage, error_percentage)

# Printing analysis results and anomaly detection
print(f"Successful Requests: {success_percentage:.2f}%")
print(f"Error Requests: {error_percentage:.2f}%")
print(anomaly_detection_result)

Successful Requests: 80.20%
Error Requests: 19.80%
Anomaly: Low success rate detected


In [14]:
spark.stop()
