In [None]:
pip --version

pip 24.1.2 from /usr/local/lib/python3.12/dist-packages/pip (python 3.12)


In [None]:
!pip install pyspark py4j



In [None]:
from pyspark import SparkContext, SparkConf



# Настройка Spark

conf = SparkConf().setAppName("Simple RDD Example").setMaster("local[*]")

sc = SparkContext(conf=conf)



# 1. Создание RDD из списка чисел

numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

rdd = sc.parallelize(numbers)



# 2. Трансформации: Фильтрация чётных чисел

even_numbers_rdd = rdd.filter(lambda x: x % 2 == 0)



# 3. Действие: Подсчёт суммы чётных чисел

sum_even_numbers = even_numbers_rdd.sum()



# Вывод результата

print("Чётные числа:", even_numbers_rdd.collect())

print("Сумма чётных чисел:", sum_even_numbers)



a = 5 + 6

print(a)

# Остановка SparkContext

sc.stop()

Чётные числа: [2, 4, 6, 8, 10]
Сумма чётных чисел: 30
11


In [None]:
from pyspark import SparkContext

# Создание объекта SparkContext
sc = SparkContext(appName="MySparkApp")

# Здесь можно выполнять операции с RDD

# Закрытие SparkContext
sc.stop()

In [None]:
from pyspark.sql import SparkSession

# Создание объекта SparkSession
spark = SparkSession.builder \
    .appName("MySparkApp") \
    .getOrCreate()

# Здесь можно выполнять операции с DataFrames
# Закрытие SparkSession
spark.stop()

In [None]:
from pyspark.sql import SparkSession

# Создание Spark Session с различными параметрами конфигурации
spark = SparkSession.builder \
    .appName("MyApp") \
    .master("local[*]") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "2g") \
    .config("spark.executor.cores", "4") \
    .config("spark.dynamicAllocation.enabled", "true") \
    .config("spark.checkpoint.dir", "/path/to/checkpoint/dir") \
    .config("spark.sql.warehouse.dir", "/path/to/warehouse/dir") \
    .config("spark.sql.catalogImplementation", "hive") \
    .enableHiveSupport() \
    .getOrCreate()

# Пример использования Spark Session
df = spark.read.json("/path/to/json/file")
df.show()

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

spark = SparkSession.builder.appName("DataFrame Example").getOrCreate()
data = [("Alice", 1), ("Bob", 2), ("Cathy", 3)]

# Явное определение схемы
schema = StructType([
    StructField("Name", StringType(), True),
    StructField("Value", IntegerType(), True)
])

# Создание DataFrame с явной схемой
df = spark.createDataFrame(data, schema)
df.printSchema()

# Автоматическое определение схемы при чтении данных из CSV
df_auto = spark.read.csv("/path/to/csv/file", header=True, inferSchema=True)
df_auto.printSchema()

In [None]:
#####  Итоговое задание по PySpark

In [1]:
!pip install pyspark py4j



In [2]:
!pip install faker

Collecting faker
  Downloading faker-39.0.0-py3-none-any.whl.metadata (16 kB)
Downloading faker-39.0.0-py3-none-any.whl (2.0 MB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/2.0 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m2.0/2.0 MB[0m [31m64.9 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.0/2.0 MB[0m [31m31.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: faker
Successfully installed faker-39.0.0


In [3]:
import csv
from faker import Faker
import random

fake = Faker()

num_records = 100000

response_codes = [200, 301, 404, 500]
http_methods = ['GET', 'POST', 'PUT', 'DELETE']

file_path = "web_server_logs.csv"

with open(file_path, mode='w', newline='') as file:
    writer = csv.writer(file)
    writer.writerow(['ip', 'timestamp', 'method', 'url', 'response_code', 'response_size'])

    for i in range(num_records):
        ip = fake.ipv4()
        timestamp = fake.date_time_this_year().isoformat()
        method = random.choice(http_methods)
        url = fake.uri_path()
        response_code = random.choice(response_codes)
        response_size = random.randint(100, 10000)

        writer.writerow([ip, timestamp, method, url, response_code, response_size])

print(f"Сгенерировано {num_records} записей и сохранено в {file_path}")

Сгенерировано 100000 записей и сохранено в web_server_logs.csv


In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col


spark = SparkSession.builder.appName('ReadCSV').getOrCreate()

df = spark.read.csv('/content/web_server_logs.csv', header=True, inferSchema=True)

grouped_df_ip = df.groupBy('ip').agg({"ip": "count"}).withColumnRenamed("count(ip)", "request_count")

print(f'Top 10 active IP adresses:')
grouped_df_ip.show(10)

print(f'Request count by HTTP method:')
grouped_df_method = df.groupBy('method').agg({'method':'count'}).withColumnRenamed('count(method)', 'method_count')

grouped_df_method.show()

filtered_df_bad = df.filter(col("response_code") == 404)
count_filtererd = filtered_df_bad.agg({'response_code':'count'}).withColumnRenamed('count(response_code)', 'Count of 404 responce codes:')
count_filtererd.show()

group_date = df.groupBy('timestamp').agg({'response_size':'sum'}).withColumnRenamed('sum(response_size)', 'total_responce_size').withColumnRenamed('timestamp', 'date')
ordered = group_date.orderBy(col('date'))
print(f'Total responce size by day:')
ordered.show()

spark.stop()

Top 10 active IP adresses:
+---------------+-------------+
|             ip|request_count|
+---------------+-------------+
| 189.144.214.66|            1|
|   173.177.90.5|            1|
| 46.209.154.122|            1|
|   12.48.98.147|            1|
|  218.245.85.94|            1|
|  16.225.39.147|            1|
| 189.75.194.222|            1|
|101.241.127.182|            1|
|   222.27.68.45|            1|
| 145.10.146.148|            1|
+---------------+-------------+
only showing top 10 rows
Request count by HTTP method:
+------+------------+
|method|method_count|
+------+------------+
|  POST|       25152|
|DELETE|       24941|
|   PUT|       24801|
|   GET|       25106|
+------+------------+

+----------------------------+
|Count of 404 responce codes:|
+----------------------------+
|                       25275|
+----------------------------+

Total responce size by day:
+--------------------+-------------------+
|                date|total_responce_size|
+--------------------+-