In [1]:
pip install faker

Collecting faker
  Downloading Faker-26.1.0-py3-none-any.whl.metadata (15 kB)
Downloading Faker-26.1.0-py3-none-any.whl (1.8 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.8/1.8 MB[0m [31m7.8 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: faker
Successfully installed faker-26.1.0


In [2]:
import csv
from faker import Faker
import random

fake = Faker()

num_records = 100000

http_methods = ['GET', 'POST', 'PUT', 'DELETE']
response_codes = [200, 301, 404, 500]

file_path = "web_server_logs.csv"

with open(file_path, mode='w', newline='') as file:
    writer = csv.writer(file)
    writer.writerow(['ip', 'timestamp', 'method', 'url', 'response_code', 'response_size'])

    for _ in range(num_records):
        ip = fake.ipv4()
        timestamp = fake.date_time_this_year().isoformat()
        method = random.choice(http_methods)
        url = fake.uri_path()
        response_code = random.choice(response_codes)
        response_size = random.randint(100, 10000)

        writer.writerow([ip, timestamp, method, url, response_code, response_size])

print(f"Сгенерировано {num_records} записей и сохранено в {file_path}")

Сгенерировано 100000 записей и сохранено в web_server_logs.csv


Логи содержат следующую информацию:

IP-адрес клиента
Временная метка запроса
HTTP-метод (GET, POST, etc.)
URL запроса
Код ответа (200, 404, etc.)
Размер ответа в байтах

Часть 2. Анализ информации
1. Сгруппируйте данные по IP и посчитайте количество запросов для каждого IP, выводим 10 самых активных IP. Формат вывода, как на скрине ниже.

2. Сгруппируйте данные по HTTP-методу и посчитайте количество запросов для каждого метода.

3. Профильтруйте и посчитайте количество запросов с кодом ответа 404.

4. Сгруппируйте данные по дате и просуммируйте размер ответов, сортируйте по дате.

In [4]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488490 sha256=ae332a38050353054140fec063f40cf02e02478c84e10ffc9455f56f5d1100fc
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [36]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import col, count, sum as spark_sum, date_format, to_date
from pyspark.sql.functions import expr

In [27]:
spark = SparkSession.builder.appName("анализ логов").getOrCreate()

In [28]:
log_file_path = "web_server_logs.csv"

# Определяем схему данных
log_schema = StructType([
    StructField("ip", StringType(), True),
    StructField("timestamp", StringType(), True),
    StructField("method", StringType(), True),
    StructField("url", StringType(), True),
    StructField("response_code", IntegerType(), True),
    StructField("response_size", IntegerType(), True)
])

# Читаем файл лога
logs_df = spark.read.csv(log_file_path, schema=log_schema, header=True)

# Выводим несколько строк для проверки
logs_df.show(5)

+---------------+--------------------+------+--------------------+-------------+-------------+
|             ip|           timestamp|method|                 url|response_code|response_size|
+---------------+--------------------+------+--------------------+-------------+-------------+
| 121.235.11.116|2024-05-25T01:58:...|   PUT|        app/category|          200|         1601|
| 196.177.238.22|2024-05-18T23:04:...|  POST| search/list/explore|          301|         7011|
|180.177.201.101|2024-03-13T13:23:...|DELETE|wp-content/wp-con...|          200|         2226|
|113.156.143.223|2024-05-10T19:12:...|   GET|               posts|          301|         2773|
| 129.120.186.97|2024-02-07T05:30:...|  POST|                list|          404|         2825|
+---------------+--------------------+------+--------------------+-------------+-------------+
only showing top 5 rows



In [29]:
# 1. Сгруппировать данные по IP и посчитать количество запросов для каждого IP, выводим 10 самых активных IP.
ip_requests_df = logs_df.groupBy("ip").agg(count("*").alias("request_count")).orderBy(col("request_count").desc())
top_10_ip_df = ip_requests_df.limit(10)
top_10_ip_df.show()

+---------------+-------------+
|             ip|request_count|
+---------------+-------------+
|   147.25.90.30|            2|
| 98.211.250.135|            1|
|    14.133.2.50|            1|
| 220.191.200.55|            1|
|174.148.172.219|            1|
|121.189.136.234|            1|
|  165.65.11.174|            1|
| 14.220.169.159|            1|
|109.233.136.174|            1|
|  217.249.85.97|            1|
+---------------+-------------+



In [30]:
# 2. Сгруппировать данные по HTTP-методу и посчитать количество запросов для каждого метода.
method_requests_df = logs_df.groupBy("method").agg(count("*").alias("method_count"))
method_requests_df.show()

+------+------------+
|method|method_count|
+------+------------+
|  POST|       24956|
|DELETE|       24984|
|   PUT|       25085|
|   GET|       24975|
+------+------------+



In [43]:
# 3. Профильтровать и посчитать количество запросов с кодом ответа 404.

spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")
logs_df = logs_df.withColumn("date", to_date(col("timestamp"), "yyyy-MM-dd"))

logs_df = logs_df.withColumn("date", date_format(col("timestamp"), "yyyy-MM-dd"))

response_404_df = logs_df.filter(col("response_code") == 404)

total_404_requests = response_404_df.count()
print(f"Number of 404 response codes: {total_404_requests}")

# response_404_by_date_df = response_404_df.groupBy("date").agg(count("*").alias("total_404_requests")).orderBy("date")
# response_404_by_date_df.show(5)

date_response_size_df = logs_df.groupBy("date").agg(spark_sum("response_size").alias("total_response_size")).orderBy("date")
date_response_size_df.show(5)



Number of 404 response codes: 24956
+----------+-------------------+
|      date|total_response_size|
+----------+-------------------+
|2024-01-01|            2309303|
|2024-01-02|            2463991|
|2024-01-03|            2347753|
|2024-01-04|            2451492|
|2024-01-05|            2332783|
+----------+-------------------+
only showing top 5 rows



In [19]:
# 4. Сгруппировать данные по дате и просуммировать размер ответов, сортируем по дате.
logs_df = logs_df.withColumn("date", date_format(col("timestamp"), "yyyy-MM-dd"))
date_response_size_df = logs_df.groupBy("date").agg(spark_sum("response_size").alias("total_response_size")).orderBy("date")
date_response_size_df.show()

+----------+-------------------+
|      date|total_response_size|
+----------+-------------------+
|2024-01-01|            2309303|
|2024-01-02|            2463991|
|2024-01-03|            2347753|
|2024-01-04|            2451492|
|2024-01-05|            2332783|
|2024-01-06|            2337486|
|2024-01-07|            2347051|
|2024-01-08|            2473530|
|2024-01-09|            2383131|
|2024-01-10|            2237969|
|2024-01-11|            2273698|
|2024-01-12|            2246762|
|2024-01-13|            2205090|
|2024-01-14|            2423287|
|2024-01-15|            2326342|
|2024-01-16|            2218911|
|2024-01-17|            2286162|
|2024-01-18|            2200574|
|2024-01-19|            2233778|
|2024-01-20|            2226129|
+----------+-------------------+
only showing top 20 rows



In [44]:
# стоп
spark.stop()