<a href="https://colab.research.google.com/github/nephelim74/spark/blob/main/spark_dz4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyspark >> None

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import pandas as pd

In [3]:
# Создание объекта SparkSession
spark = SparkSession.builder.appName("Streaming Odd Number Sum").getOrCreate()

In [5]:
# Создание потока данных с использованием источника rate
inputStream = spark.readStream.format("rate").option("rowsPerSecond", 1).load()

In [8]:
# Преобразуем входные данные, чтобы получить целые числа
numbers = inputStream.select((F.col("value") % 10).alias("id"))
#inputStream: Это поток данных, который получаем из rate, который генерирует последовательные числа.
#В результате, inputStream будет содержать DataFrame с колонкой value, где каждое значение — это целое число, генерируемое в режиме реального времени.
#F.col("value"): Это функция из библиотеки pyspark.sql.functions, которая позволяет ссылаться на колонку value в DataFrame.
# В данном случае, мы обращаемся к колонке, содержащей числа, сгенерированные потоком.
#% 10: Выводим только числа от 0 до 9
#.alias("id"): создаем алиас для колонки содержащей остаток от деления.



In [9]:
# Создаем потоковую агрегацию для суммирования нечётных чисел
odd_numbers = numbers.filter(numbers.id % 2 != 0)

In [10]:
# Инициализация переменной для хранения суммы
total_sum = 0

In [11]:
# Функция для обработки каждого батча
def process_batch(batch_df, batch_id):
    global total_sum  # Используем глобальную переменную для хранения суммы
    print(f"Обработка batch ID: {batch_id}")

    if not batch_df.isEmpty():  # Проверяем, что DataFrame не пустой
        # Преобразуем в Pandas DataFrame для удобства вывода
        pandas_df = batch_df.toPandas()
        # Суммируем только нечетные числа в текущем батче
        batch_sum = pandas_df['id'].sum()
        # Обновляем общую сумму
        total_sum += batch_sum
        # Выводим каждое полученное число и текущую сумму
        for index, row in pandas_df.iterrows():
           print(f"Получено нечетное число: {row['id']}, Текущая общая сумма: {total_sum}")
        # Выводим текущую сумму после обработки батча
        print(f"Общая сумма после после обработки batch {batch_id}: {total_sum}")

In [12]:
# Запускаем поток и используем foreachBatch для обработки каждого батча
query = odd_numbers.writeStream.outputMode("update").foreachBatch(process_batch).start()

In [13]:
# Ожидаем завершения потока (например, 30 секунд)
import time
time.sleep(30)
# Останавливаем поток
query.stop()

Обработка batch ID: 0
Обработка batch ID: 1
Получено нечетное число: 1, Текущая общая сумма: 1
Общая сумма после после обработки batch 1: 1
Обработка batch ID: 2
Получено нечетное число: 3, Текущая общая сумма: 4
Общая сумма после после обработки batch 2: 4
Обработка batch ID: 3
Получено нечетное число: 5, Текущая общая сумма: 9
Общая сумма после после обработки batch 3: 9
Обработка batch ID: 4
Обработка batch ID: 5
Получено нечетное число: 7, Текущая общая сумма: 16
Общая сумма после после обработки batch 5: 16
Обработка batch ID: 6
Обработка batch ID: 7
Получено нечетное число: 9, Текущая общая сумма: 25
Общая сумма после после обработки batch 7: 25
Обработка batch ID: 8
Обработка batch ID: 9
Получено нечетное число: 1, Текущая общая сумма: 26
Общая сумма после после обработки batch 9: 26
Обработка batch ID: 10
Обработка batch ID: 11
Получено нечетное число: 3, Текущая общая сумма: 29
Общая сумма после после обработки batch 11: 29
Обработка batch ID: 12
Обработка batch ID: 13
Получен