# Batch vs Streaming обработка данных

Этот notebook демонстрирует различия между batch и streaming обработкой данных.


## 1. Batch обработка

Batch обработка - это обработка больших объемов данных за определенный период времени.

### Характеристики:
- Обработка исторических данных
- Выполняется по расписанию (например, раз в день)
- Обрабатывает все данные сразу
- Более эффективна для больших объемов
- Результаты доступны после завершения обработки


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, avg, max

# Создаем Spark Session
spark = SparkSession.builder \
    .appName("Batch Processing Example") \
    .master("spark://spark-master:7077") \
    .getOrCreate()

# Читаем исторические данные из S3
batch_df = spark.read.parquet("s3a://raw-data/flights/*.parquet")

print(f"Загружено {batch_df.count()} записей для batch обработки")

# Агрегация по странам
batch_stats = batch_df \
    .groupBy("origin_country") \
    .agg(
        count("*").alias("total_flights"),
        avg("velocity").alias("avg_velocity"),
        max("velocity").alias("max_velocity")
    ) \
    .orderBy("total_flights", ascending=False)

batch_stats.show(10)


## 2. Streaming обработка

Streaming обработка - это обработка данных в реальном времени по мере их поступления.

### Характеристики:
- Обработка данных в реальном времени
- Непрерывная обработка
- Обрабатывает данные небольшими батчами
- Результаты доступны сразу
- Требует больше ресурсов


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, avg, window
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql.functions import from_json

# Создаем Spark Session
spark = SparkSession.builder \
    .appName("Streaming Processing Example") \
    .master("spark://spark-master:7077") \
    .getOrCreate()

# Читаем поток из Kafka
streaming_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:29092") \
    .option("subscribe", "raw_flights") \
    .option("startingOffsets", "latest") \
    .load()

# Парсим JSON
schema = StructType([
    StructField("icao24", StringType()),
    StructField("origin_country", StringType()),
    StructField("velocity", DoubleType())
])

flights = streaming_df.select(
    from_json(col("value").cast("string"), schema).alias("data")
).select("data.*")

# Агрегация по окнам времени (5 минут)
windowed_stats = flights \
    .withWatermark("processing_time", "1 minute") \
    .groupBy(
        window(col("processing_time"), "5 minutes"),
        col("origin_country")
    ) \
    .agg(
        count("*").alias("flight_count"),
        avg("velocity").alias("avg_velocity")
    )

# Выводим результаты в консоль (для демонстрации)
query = windowed_stats \
    .writeStream \
    .outputMode("update") \
    .format("console") \
    .start()

query.awaitTermination()


## 3. Сравнение

| Характеристика | Batch | Streaming |
|----------------|-------|-----------|
| Задержка | Высокая (часы/дни) | Низкая (секунды/минуты) |
| Объем данных | Большой | Небольшие батчи |
| Ресурсы | Высокие, но периодические | Постоянные |
| Сложность | Проще | Сложнее |
| Использование | Исторический анализ | Мониторинг в реальном времени |
