In [None]:
pip install pyspark py4j

In [None]:
pip install faker

In [None]:
# Сгенерировали 100,000 записей логов и сохранили их в CSV-файл
import csv
from faker import Faker
import random

fake = Faker()

num_records = 100000

http_methods = ['GET', 'POST', 'PUT', 'DELETE']
response_codes = [200, 301, 404, 500]

file_path = "web_server_logs.csv"

with open(file_path, mode='w', newline='') as file:
    writer = csv.writer(file)
    writer.writerow(['ip', 'timestamp', 'method', 'url', 'response_code', 'response_size'])

    for _ in range(num_records):
        ip = fake.ipv4()
        timestamp = fake.date_time_this_year().isoformat()
        method = random.choice(http_methods)
        url = fake.uri_path()
        response_code = random.choice(response_codes)
        response_size = random.randint(100, 10000)

        writer.writerow([ip, timestamp, method, url, response_code, response_size])

print(f"Сгенерировано {num_records} записей и сохранено в {file_path}")

In [9]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_timestamp, col, count, sum, to_date

# создаем спарк сессию
spark = SparkSession.builder.appName('records').getOrCreate()

# читаем наш файл
data_df = spark.read.csv('web_server_logs.csv', header=True, inferSchema=True)

# переводим столбец 'timestamp' в формат даты (формат 'timestamp' не пригодился)
data_df = data_df.withColumn('timestamp', to_date('timestamp', 'yyyy-mm-dd' ))

# количество запросов для каждого IP, выводим 10 самых активных IP
top_ip = data_df.groupBy('ip').count().withColumnRenamed('count', 'request_count').orderBy(col('request_count').desc()).limit(10)
top_ip.show()

# количество запросов для каждого HTTP метода
methods = data_df.groupBy('method').count().withColumnRenamed("count","method_count").orderBy(col('method_count').desc())
methods.show()

# количество запросов с кодом ответа 404
num_of_404 = data_df.filter(col('response_code') == 404).count()
print(f'Number of 404 response codes: {num_of_404}')

# сумма размеров ответов отсортированных по дате
response_size_to_date = data_df.groupBy('timestamp').agg(sum(col('response_size')).alias('response_size')).orderBy('timestamp')
response_size_to_date.show()

+---------------+-------------+
|             ip|request_count|
+---------------+-------------+
|   88.104.65.46|            2|
| 214.177.57.171|            2|
| 114.16.181.188|            1|
|  50.247.213.23|            1|
| 51.226.248.128|            1|
| 112.135.37.215|            1|
|183.193.162.201|            1|
|    56.45.59.90|            1|
|  46.180.172.64|            1|
|  90.118.185.90|            1|
+---------------+-------------+

+------+------------+
|method|method_count|
+------+------------+
|   GET|       25062|
|   PUT|       25023|
|DELETE|       24969|
|  POST|       24946|
+------+------------+

Number of 404 response codes: 24928
+----------+-------------+
| timestamp|response_size|
+----------+-------------+
|2024-01-01|      1815940|
|2024-01-02|      1800430|
|2024-01-03|      1508636|
|2024-01-04|      1852819|
|2024-01-05|      1827931|
|2024-01-06|      1693317|
|2024-01-07|      1984794|
|2024-01-08|      1898371|
|2024-01-09|      1968440|
|2024-01-10|  