<a href="https://colab.research.google.com/github/lastinm/ml_hw_notebooks/blob/main/%D0%A5%D1%80%D0%B0%D0%BD%D0%B5%D0%BD%D0%B8%D0%B5%2C_%D0%B8%D0%B7%D0%B2%D0%BB%D0%B5%D1%87%D0%B5%D0%BD%D0%B8%D0%B5_%D0%B8_%D0%BE%D0%B1%D1%80%D0%B0%D0%B1%D0%BE%D1%82%D0%BA%D0%B0_%D0%91%D0%94_%D0%9B%D0%B0%D0%B1%D0%BE%D1%80%D0%B0%D1%82%D0%BE%D1%80%D0%BD%D0%B0%D1%8F_%D1%80%D0%B0%D0%B1%D0%BE%D1%82%D0%B0_%E2%84%965.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Задание 1. Structured Streaming из набора файлов

In [16]:
from pyspark.sql import SparkSession

spark = SparkSession \
  .builder \
  .appName("Streaming 1") \
  .config("spark.sql.streaming.schemaInference", "true") \
  .getOrCreate()

1. Подготовить данные, имеющиеся во вложении.
2. Открыть на основе исходных json файлов потоковый датафрейм.

In [17]:
df = spark.readStream \
  .format("json") \
  .option("path", "/content/sample_data") \
  .option("maxFilesPerTrigger", 1) \
  .load()

3. Написать преобразование исходного датафрейма, для получения плоской (flat) структуры.

In [18]:
df2 = df.selectExpr("InvoiceNumber",
                    "CreatedTime",
                    "StoreID",
                    "PosID",
                    "CashierID",
                    "CustomerType",
                    "CustomerCardNo",
                    "TotalAmount",
                    "NumberOfItems",
                    "PaymentMethod",
                    "TaxableAmount",
                    "explode(InvoiceLineItems) as Item")

In [19]:
from pyspark.sql.functions import expr

# [{"ItemCode":"458","ItemDescription":"Wine glass","ItemPrice":1644.0,"ItemQty":2,"TotalValue":3288.0},{"ItemCode":"283","ItemDescription":"Portable Lamps","ItemPrice":2236.0,"ItemQty":1,"TotalValue":2236.0},{"ItemCode":"498","ItemDescription":"Carving knifes","ItemPrice":1424.0,"ItemQty":2,"TotalValue":2848.0},{"ItemCode":"523","ItemDescription":"Oil-lamp clock","ItemPrice":1371.0,"ItemQty":2,"TotalValue":2742.0}]
df3 = df2 \
  .withColumn("ItemCode", expr("Item.ItemCode")) \
  .withColumn("ItemDescription", expr("Item.ItemDescription")) \
  .withColumn("ItemPrice", expr("Item.ItemPrice")) \
  .withColumn("ItemQty", expr("Item.ItemQty")) \
  .withColumn("TotalValue", expr("Item.TotalValue")) \
  .drop("Item")

4. Сохранять получаемые каждые X секунд исходные файлы в json.

In [21]:
query = df3 \
  .writeStream \
  .format("json") \
  .option("path", "/content/output") \
  .option("checkpointLocation", "/content/checkpoint") \
  .trigger(processingTime="10 seconds") \
  .start()

In [25]:
query.lastProgress

{'id': 'b5c88650-07d1-4a2f-80bc-4787b726564a',
 'runId': '36e98a25-8bab-4937-b15d-d2283405e630',
 'name': None,
 'timestamp': '2025-01-04T18:53:40.000Z',
 'batchId': 9,
 'numInputRows': 0,
 'inputRowsPerSecond': 0.0,
 'processedRowsPerSecond': 0.0,
 'durationMs': {'latestOffset': 4, 'triggerExecution': 6},
 'stateOperators': [],
 'sources': [{'description': 'FileStreamSource[file:/content/sample_data]',
   'startOffset': {'logOffset': 8},
   'endOffset': {'logOffset': 8},
   'latestOffset': None,
   'numInputRows': 0,
   'inputRowsPerSecond': 0.0,
   'processedRowsPerSecond': 0.0}],
 'sink': {'description': 'FileSink[/content/output]', 'numOutputRows': -1}}

In [26]:
spark.stop()

# Задание 2. Применение Tumbling Window

1. Подготовить подходящие данные или воспользоваться продемонстрированным на паре генератором.

In [29]:
from pyspark.sql import SparkSession
spark = SparkSession\
  .builder \
  .appName("Streaming 2") \
  .config("spark.sql.streaming.schemaInference", "true") \
  .getOrCreate()

2. Открыть на основе исходных json файлов потоковый датафрейм.

In [30]:
# Определим структуру будущего датафрейма
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType

schema = StructType([
    StructField("createdTime", StringType()),
    StructField("price", DoubleType()),
    StructField("quantity", IntegerType()),
    StructField("type", StringType())
])

In [32]:
# Создаем датафрейм
df = spark.readStream \
    .format("json") \
    .option("path", "/content/input2") \
    .option("maxFilesPerTrigger", 1) \
    .schema(schema) \
    .load()

In [33]:
# Генерация данных
import time
import random
import json


# создаем от 5 до 10 файлов с небольшим количеством объектов
# в каждом файле сдвигаем временную метку на 4 сек
for frame in range(random.randint(5, 20)):
  with open(f"input2/{frame}.json", "w") as file:
    for _ in range(random.randint(10, 20)):
      file.write(json.dumps({
        "createdTime": int(time.time()) + 4 * frame,
        "price": random.randint(1000, 10_000) / 100,
        "quantity":random.randint(100, 2000),
        "type": random.choice(['type1', 'type2', 'type3'])
      }))
      file.write("\n")

In [34]:
# Исходная схема: createdTime - string, а должен быть timestamp
df.printSchema()

root
 |-- createdTime: string (nullable = true)
 |-- price: double (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- type: string (nullable = true)



In [35]:
from pyspark.sql.functions import to_timestamp, col, current_timestamp, from_unixtime
df_2 = df\
  .withColumn("createdTime", to_timestamp(from_unixtime(col("createdTime"))))

In [36]:
df_2.printSchema()

root
 |-- createdTime: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- type: string (nullable = true)



3. Написать произвольный запрос агрегации, используя функцию tumbling window.

In [37]:
# Создаем агрегацию с оконной функцией, к
# оторая разбивает наши данные на равные отрезки - по 5 секунд,
# в рамках которых и производятся агрегации.
from pyspark.sql.functions import window, col, avg, max



agg_df = df_2 \
    .groupBy(
        window(col("createdTime"), "10 seconds")
    ) \
    .agg(
      avg("price").alias("avg_price"),
      max("quantity").alias("max_quantity"))

In [38]:
result_df = agg_df.select("window.start", "window.end", "avg_price", "max_quantity")

In [39]:
result_df.printSchema()

root
 |-- start: timestamp (nullable = true)
 |-- end: timestamp (nullable = true)
 |-- avg_price: double (nullable = true)
 |-- max_quantity: integer (nullable = true)



4. Сохранять получаемые каждые X секунд исходные файлы в json.

In [40]:
# Опция `.outputMode("complete")` указыает на то,
# что в результате получаем один df в котором будут актуальные данные (обновляются все строки)
query = result_df.writeStream \
  .format("memory") \
  .outputMode("complete") \
  .option("queryName", "second_query2") \
  .option("checkpointLocation", "check-dir-6") \
  .trigger(processingTime = "5 seconds") \
  .start()

In [44]:
query.lastProgress

{'id': 'c9681f18-f6aa-4f90-99be-5ccf757a17d0',
 'runId': '1cfd9802-d341-487e-8789-412848478856',
 'name': 'second_query2',
 'timestamp': '2025-01-04T19:19:39.188Z',
 'batchId': 0,
 'numInputRows': 16,
 'inputRowsPerSecond': 0.0,
 'processedRowsPerSecond': 0.5708169818052087,
 'durationMs': {'addBatch': 27493,
  'commitOffsets': 101,
  'getBatch': 20,
  'latestOffset': 76,
  'queryPlanning': 281,
  'triggerExecution': 28030,
  'walCommit': 48},
 'stateOperators': [{'operatorName': 'stateStoreSave',
   'numRowsTotal': 1,
   'numRowsUpdated': 1,
   'allUpdatesTimeMs': 1474,
   'numRowsRemoved': 0,
   'allRemovalsTimeMs': 0,
   'commitTimeMs': 29578,
   'memoryUsedBytes': 45096,
   'numRowsDroppedByWatermark': 0,
   'numShufflePartitions': 200,
   'numStateStoreInstances': 200,
   'customMetrics': {'loadedMapCacheHitCount': 0,
    'loadedMapCacheMissCount': 0,
    'stateOnCurrentVersionSizeBytes': 16296}}],
 'sources': [{'description': 'FileStreamSource[file:/content/input2]',
   'startOff

In [45]:
spark.sql("SELECT * FROM second_query2").show()

+-------------------+-------------------+-----------------+------------+
|              start|                end|        avg_price|max_quantity|
+-------------------+-------------------+-----------------+------------+
|2025-01-04 19:16:30|2025-01-04 19:16:40|56.81321428571429|        1994|
+-------------------+-------------------+-----------------+------------+



In [46]:
# Запись в файл после кадждого обновления (после каждого нового считанного batch)
currentBatch = 0
while not query.awaitTermination(5):
  lastProgress = query.lastProgress
  print(lastProgress)
  if lastProgress['numInputRows'] == 0:
    query.stop()
  else:
    batchId = lastProgress['batchId']
    if currentBatch <= batchId:
      spark.sql("SELECT * FROM second_query2").write.json(f"output2/{batchId}.json")
      currentBatch = batchId + 1

{'id': 'c9681f18-f6aa-4f90-99be-5ccf757a17d0', 'runId': '1cfd9802-d341-487e-8789-412848478856', 'name': 'second_query2', 'timestamp': '2025-01-04T19:20:44.453Z', 'batchId': 3, 'numInputRows': 18, 'inputRowsPerSecond': 0.9688357823348942, 'processedRowsPerSecond': 0.9986684420772303, 'durationMs': {'addBatch': 17821, 'commitOffsets': 40, 'getBatch': 14, 'latestOffset': 64, 'queryPlanning': 41, 'triggerExecution': 18024, 'walCommit': 44}, 'stateOperators': [{'operatorName': 'stateStoreSave', 'numRowsTotal': 2, 'numRowsUpdated': 1, 'allUpdatesTimeMs': 762, 'numRowsRemoved': 0, 'allRemovalsTimeMs': 0, 'commitTimeMs': 24479, 'memoryUsedBytes': 86864, 'numRowsDroppedByWatermark': 0, 'numShufflePartitions': 200, 'numStateStoreInstances': 200, 'customMetrics': {'loadedMapCacheHitCount': 1200, 'loadedMapCacheMissCount': 0, 'stateOnCurrentVersionSizeBytes': 21232}}], 'sources': [{'description': 'FileStreamSource[file:/content/input2]', 'startOffset': {'logOffset': 2}, 'endOffset': {'logOffset': 

In [47]:
# Запись в файл после считывания всех файлов (легче и предпочтительнее)
currentBatch = 0
while not query.awaitTermination(5):
  lastProgress = query.lastProgress
  print(lastProgress)
  if lastProgress['numInputRows'] == 0:
    spark.sql("SELECT * FROM second_query2").write.json(f"output2/{batchId}.json")
    query.stop()

In [48]:
!rm -r /content/check-dir-6/
!rm -r /content/input2/
!mkdir /content/input2

In [49]:
spark.stop()