In [1]:
from pyspark.sql import SparkSession
import csv
import random
from faker import Faker

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
3,,pyspark,idle,,,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
# generate data
fake = Faker()
num_records = 100000
http_methods = ['GET', 'POST', 'PUT', 'DELETE']
response_codes = [200, 301, 404, 500]
file_path = "web_server_logs.csv"


with open(file_path, mode='w', newline='') as file:
    writer = csv.writer(file)
    writer.writerow(['ip', 'timestamp', 'method', 'url', 'response_code', 'response_size'])

    for _ in range(num_records):
        ip = fake.ipv4()
        timestamp = fake.date_time_this_year().isoformat()
        method = random.choice(http_methods)
        url = fake.uri_path()
        response_code = random.choice(response_codes)
        response_size = random.randint(100, 10000)

        writer.writerow([ip, timestamp, method, url, response_code, response_size])

print(f"Сгенерировано {num_records} записей и сохранено в {file_path}")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Сгенерировано 100000 записей и сохранено в web_server_logs.csv

In [3]:
spark = SparkSession.builder.appName("spark hw").getOrCreate()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
df = spark.read.option("header", True).csv("web_server_logs.csv")
df.show(3)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------+--------------------+------+---------------+-------------+-------------+
|           ip|           timestamp|method|            url|response_code|response_size|
+-------------+--------------------+------+---------------+-------------+-------------+
| 147.42.78.89|2024-05-12T10:01:...|   GET|   main/explore|          200|         8076|
|190.220.32.85|2024-05-20T19:01:...|DELETE|app/tags/search|          200|         1643|
| 4.164.111.78|2024-06-11T02:48:...|DELETE|           blog|          200|         3242|
+-------------+--------------------+------+---------------+-------------+-------------+
only showing top 3 rows

In [5]:
# 1. Сгруппируйте данные по IP и посчитайте количество запросов для каждого IP, выводим 10 самых активных IP
from pyspark.sql.functions import desc, asc

grouped_df = df.select('ip', 'url').groupby('ip').count().withColumnRenamed('count', 'request_count')
print('Top 10 active IP addresses: ')
grouped_df.sort(grouped_df['request_count'].desc()).show(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Top 10 active IP addresses: 
+---------------+-------------+
|             ip|request_count|
+---------------+-------------+
|     23.8.77.59|            2|
|     40.7.5.100|            2|
|  33.219.76.106|            1|
|213.186.138.123|            1|
| 117.72.182.118|            1|
| 110.149.94.174|            1|
|   118.4.49.188|            1|
|139.160.220.134|            1|
|  156.69.248.44|            1|
|206.115.227.235|            1|
+---------------+-------------+
only showing top 10 rows

In [6]:
# 2. Сгруппируйте данные по HTTP-методу и посчитайте количество запросов для каждого метода.
grouped_df = df.select('method', 'url').groupby('method').count().withColumnRenamed('count', 'method_count')
print('Request count by HTTP method: ')
grouped_df.sort(grouped_df['method_count'].desc()).show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Request count by HTTP method: 
+------+------------+
|method|method_count|
+------+------------+
|   PUT|       25024|
|   GET|       25020|
|  POST|       24987|
|DELETE|       24969|
+------+------------+

In [7]:
#3. Профильтруйте и посчитайте количество запросов с кодом ответа 404.
result = df.filter(df['response_code']==404).count()
print(f'Number of 404 response codes is {result}')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Number of 404 response codes is 24945

In [8]:
from pyspark.sql.types import DateType, IntegerType

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
#4. Сгруппируйте данные по дате и просуммируйте размер ответов, сортируйте по дате.

df = (
    df
    .withColumn('date', df['timestamp'].cast("date"))
    .withColumn('response_size', df['response_size'].cast("int"))
)

grouped_df = df.select('date', 'response_size').groupby('date').sum().withColumnRenamed('sum(response_size)', 'total_response_size')
print('Total response size by day: ')
grouped_df.sort(grouped_df['date'].asc()).show(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Total response size by day: 
+----------+-------------------+
|      date|total_response_size|
+----------+-------------------+
|2024-01-01|            1941839|
|2024-01-02|            1763470|
|2024-01-03|            1811044|
|2024-01-04|            1859343|
|2024-01-05|            2012067|
|2024-01-06|            1909545|
|2024-01-07|            1849481|
|2024-01-08|            1849345|
|2024-01-09|            1887913|
|2024-01-10|            1945081|
+----------+-------------------+
only showing top 10 rows